Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
14#[serde(transparent)]
15pub struct TriggerEventId(pub String);
16
17impl TriggerEventId {
18    pub fn new() -> Self {
19        Self(format!("trigger_evt_{}", Uuid::now_v7()))
20    }
21}
22
23impl Default for TriggerEventId {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ProviderId(pub String);
32
33impl ProviderId {
34    pub fn new(value: impl Into<String>) -> Self {
35        Self(value.into())
36    }
37
38    pub fn as_str(&self) -> &str {
39        self.0.as_str()
40    }
41}
42
43impl From<&str> for ProviderId {
44    fn from(value: &str) -> Self {
45        Self::new(value)
46    }
47}
48
49impl From<String> for ProviderId {
50    fn from(value: String) -> Self {
51        Self::new(value)
52    }
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56#[serde(transparent)]
57pub struct TraceId(pub String);
58
59impl TraceId {
60    pub fn new() -> Self {
61        Self(format!("trace_{}", Uuid::now_v7()))
62    }
63}
64
65impl Default for TraceId {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
72#[serde(transparent)]
73pub struct TenantId(pub String);
74
75impl TenantId {
76    pub fn new(value: impl Into<String>) -> Self {
77        Self(value.into())
78    }
79}
80
81#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(tag = "state", rename_all = "snake_case")]
83pub enum SignatureStatus {
84    Verified,
85    Unsigned,
86    Failed { reason: String },
87}
88
89#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
90pub struct GitHubEventCommon {
91    pub event: String,
92    pub action: Option<String>,
93    pub delivery_id: Option<String>,
94    pub installation_id: Option<i64>,
95    pub raw: JsonValue,
96}
97
98#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
99pub struct GitHubIssuesEventPayload {
100    #[serde(flatten)]
101    pub common: GitHubEventCommon,
102    pub issue: JsonValue,
103}
104
105#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
106pub struct GitHubPullRequestEventPayload {
107    #[serde(flatten)]
108    pub common: GitHubEventCommon,
109    pub pull_request: JsonValue,
110}
111
112#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
113pub struct GitHubIssueCommentEventPayload {
114    #[serde(flatten)]
115    pub common: GitHubEventCommon,
116    pub issue: JsonValue,
117    pub comment: JsonValue,
118}
119
120#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
121pub struct GitHubPullRequestReviewEventPayload {
122    #[serde(flatten)]
123    pub common: GitHubEventCommon,
124    pub pull_request: JsonValue,
125    pub review: JsonValue,
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct GitHubPushEventPayload {
130    #[serde(flatten)]
131    pub common: GitHubEventCommon,
132    #[serde(default)]
133    pub commits: Vec<JsonValue>,
134    pub distinct_size: Option<i64>,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubWorkflowRunEventPayload {
139    #[serde(flatten)]
140    pub common: GitHubEventCommon,
141    pub workflow_run: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145#[serde(untagged)]
146pub enum GitHubEventPayload {
147    Issues(GitHubIssuesEventPayload),
148    PullRequest(GitHubPullRequestEventPayload),
149    IssueComment(GitHubIssueCommentEventPayload),
150    PullRequestReview(GitHubPullRequestReviewEventPayload),
151    Push(GitHubPushEventPayload),
152    WorkflowRun(GitHubWorkflowRunEventPayload),
153    Other(GitHubEventCommon),
154}
155
156#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
157pub struct SlackEventCommon {
158    pub event: String,
159    pub event_id: Option<String>,
160    pub api_app_id: Option<String>,
161    pub team_id: Option<String>,
162    pub channel_id: Option<String>,
163    pub user_id: Option<String>,
164    pub event_ts: Option<String>,
165    pub raw: JsonValue,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169pub struct SlackMessageEventPayload {
170    #[serde(flatten)]
171    pub common: SlackEventCommon,
172    pub subtype: Option<String>,
173    pub channel_type: Option<String>,
174    pub channel: Option<String>,
175    pub user: Option<String>,
176    pub text: Option<String>,
177    pub ts: Option<String>,
178    pub thread_ts: Option<String>,
179}
180
181#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
182pub struct SlackAppMentionEventPayload {
183    #[serde(flatten)]
184    pub common: SlackEventCommon,
185    pub channel: Option<String>,
186    pub user: Option<String>,
187    pub text: Option<String>,
188    pub ts: Option<String>,
189    pub thread_ts: Option<String>,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193pub struct SlackReactionAddedEventPayload {
194    #[serde(flatten)]
195    pub common: SlackEventCommon,
196    pub reaction: Option<String>,
197    pub item_user: Option<String>,
198    pub item: JsonValue,
199}
200
201#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
202pub struct SlackAppHomeOpenedEventPayload {
203    #[serde(flatten)]
204    pub common: SlackEventCommon,
205    pub user: Option<String>,
206    pub channel: Option<String>,
207    pub tab: Option<String>,
208    pub view: JsonValue,
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
212pub struct SlackAssistantThreadStartedEventPayload {
213    #[serde(flatten)]
214    pub common: SlackEventCommon,
215    pub assistant_thread: JsonValue,
216    pub thread_ts: Option<String>,
217    pub context: JsonValue,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(untagged)]
222pub enum SlackEventPayload {
223    Message(SlackMessageEventPayload),
224    AppMention(SlackAppMentionEventPayload),
225    ReactionAdded(SlackReactionAddedEventPayload),
226    AppHomeOpened(SlackAppHomeOpenedEventPayload),
227    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
228    Other(SlackEventCommon),
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct LinearEventCommon {
233    pub event: String,
234    pub action: Option<String>,
235    pub delivery_id: Option<String>,
236    pub organization_id: Option<String>,
237    pub webhook_timestamp: Option<i64>,
238    pub webhook_id: Option<String>,
239    pub url: Option<String>,
240    pub created_at: Option<String>,
241    pub actor: JsonValue,
242    pub raw: JsonValue,
243}
244
245#[derive(Clone, Debug, PartialEq)]
246pub enum LinearIssueChange {
247    Title { previous: Option<String> },
248    Description { previous: Option<String> },
249    Priority { previous: Option<i64> },
250    Estimate { previous: Option<i64> },
251    StateId { previous: Option<String> },
252    TeamId { previous: Option<String> },
253    AssigneeId { previous: Option<String> },
254    ProjectId { previous: Option<String> },
255    CycleId { previous: Option<String> },
256    DueDate { previous: Option<String> },
257    ParentId { previous: Option<String> },
258    SortOrder { previous: Option<f64> },
259    LabelIds { previous: Vec<String> },
260    CompletedAt { previous: Option<String> },
261    Other { field: String, previous: JsonValue },
262}
263
264impl Serialize for LinearIssueChange {
265    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
266    where
267        S: Serializer,
268    {
269        let value = match self {
270            Self::Title { previous } => {
271                serde_json::json!({ "field_name": "title", "previous": previous })
272            }
273            Self::Description { previous } => {
274                serde_json::json!({ "field_name": "description", "previous": previous })
275            }
276            Self::Priority { previous } => {
277                serde_json::json!({ "field_name": "priority", "previous": previous })
278            }
279            Self::Estimate { previous } => {
280                serde_json::json!({ "field_name": "estimate", "previous": previous })
281            }
282            Self::StateId { previous } => {
283                serde_json::json!({ "field_name": "state_id", "previous": previous })
284            }
285            Self::TeamId { previous } => {
286                serde_json::json!({ "field_name": "team_id", "previous": previous })
287            }
288            Self::AssigneeId { previous } => {
289                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
290            }
291            Self::ProjectId { previous } => {
292                serde_json::json!({ "field_name": "project_id", "previous": previous })
293            }
294            Self::CycleId { previous } => {
295                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
296            }
297            Self::DueDate { previous } => {
298                serde_json::json!({ "field_name": "due_date", "previous": previous })
299            }
300            Self::ParentId { previous } => {
301                serde_json::json!({ "field_name": "parent_id", "previous": previous })
302            }
303            Self::SortOrder { previous } => {
304                serde_json::json!({ "field_name": "sort_order", "previous": previous })
305            }
306            Self::LabelIds { previous } => {
307                serde_json::json!({ "field_name": "label_ids", "previous": previous })
308            }
309            Self::CompletedAt { previous } => {
310                serde_json::json!({ "field_name": "completed_at", "previous": previous })
311            }
312            Self::Other { field, previous } => {
313                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
314            }
315        };
316        value.serialize(serializer)
317    }
318}
319
320impl<'de> Deserialize<'de> for LinearIssueChange {
321    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
322    where
323        D: Deserializer<'de>,
324    {
325        let value = JsonValue::deserialize(deserializer)?;
326        let field_name = value
327            .get("field_name")
328            .and_then(JsonValue::as_str)
329            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
330        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
331        Ok(match field_name {
332            "title" => Self::Title {
333                previous: previous.as_str().map(ToString::to_string),
334            },
335            "description" => Self::Description {
336                previous: previous.as_str().map(ToString::to_string),
337            },
338            "priority" => Self::Priority {
339                previous: parse_json_i64ish(&previous),
340            },
341            "estimate" => Self::Estimate {
342                previous: parse_json_i64ish(&previous),
343            },
344            "state_id" => Self::StateId {
345                previous: previous.as_str().map(ToString::to_string),
346            },
347            "team_id" => Self::TeamId {
348                previous: previous.as_str().map(ToString::to_string),
349            },
350            "assignee_id" => Self::AssigneeId {
351                previous: previous.as_str().map(ToString::to_string),
352            },
353            "project_id" => Self::ProjectId {
354                previous: previous.as_str().map(ToString::to_string),
355            },
356            "cycle_id" => Self::CycleId {
357                previous: previous.as_str().map(ToString::to_string),
358            },
359            "due_date" => Self::DueDate {
360                previous: previous.as_str().map(ToString::to_string),
361            },
362            "parent_id" => Self::ParentId {
363                previous: previous.as_str().map(ToString::to_string),
364            },
365            "sort_order" => Self::SortOrder {
366                previous: previous.as_f64(),
367            },
368            "label_ids" => Self::LabelIds {
369                previous: parse_string_array(&previous),
370            },
371            "completed_at" => Self::CompletedAt {
372                previous: previous.as_str().map(ToString::to_string),
373            },
374            "other" => Self::Other {
375                field: value
376                    .get("field")
377                    .and_then(JsonValue::as_str)
378                    .map(ToString::to_string)
379                    .unwrap_or_else(|| "unknown".to_string()),
380                previous,
381            },
382            other => Self::Other {
383                field: other.to_string(),
384                previous,
385            },
386        })
387    }
388}
389
390#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
391pub struct LinearIssueEventPayload {
392    #[serde(flatten)]
393    pub common: LinearEventCommon,
394    pub issue: JsonValue,
395    #[serde(default)]
396    pub changes: Vec<LinearIssueChange>,
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct LinearIssueCommentEventPayload {
401    #[serde(flatten)]
402    pub common: LinearEventCommon,
403    pub comment: JsonValue,
404}
405
406#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
407pub struct LinearIssueLabelEventPayload {
408    #[serde(flatten)]
409    pub common: LinearEventCommon,
410    pub label: JsonValue,
411}
412
413#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
414pub struct LinearProjectEventPayload {
415    #[serde(flatten)]
416    pub common: LinearEventCommon,
417    pub project: JsonValue,
418}
419
420#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
421pub struct LinearCycleEventPayload {
422    #[serde(flatten)]
423    pub common: LinearEventCommon,
424    pub cycle: JsonValue,
425}
426
427#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
428pub struct LinearCustomerEventPayload {
429    #[serde(flatten)]
430    pub common: LinearEventCommon,
431    pub customer: JsonValue,
432}
433
434#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
435pub struct LinearCustomerRequestEventPayload {
436    #[serde(flatten)]
437    pub common: LinearEventCommon,
438    pub customer_request: JsonValue,
439}
440
441#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
442#[serde(untagged)]
443pub enum LinearEventPayload {
444    Issue(LinearIssueEventPayload),
445    IssueComment(LinearIssueCommentEventPayload),
446    IssueLabel(LinearIssueLabelEventPayload),
447    Project(LinearProjectEventPayload),
448    Cycle(LinearCycleEventPayload),
449    Customer(LinearCustomerEventPayload),
450    CustomerRequest(LinearCustomerRequestEventPayload),
451    Other(LinearEventCommon),
452}
453
454#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
455pub struct NotionPolledChangeEvent {
456    pub resource: String,
457    pub source_id: String,
458    pub entity_id: String,
459    pub high_water_mark: String,
460    pub before: Option<JsonValue>,
461    pub after: JsonValue,
462}
463
464#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
465pub struct NotionEventPayload {
466    pub event: String,
467    pub workspace_id: Option<String>,
468    pub request_id: Option<String>,
469    pub subscription_id: Option<String>,
470    pub integration_id: Option<String>,
471    pub attempt_number: Option<u32>,
472    pub entity_id: Option<String>,
473    pub entity_type: Option<String>,
474    pub api_version: Option<String>,
475    pub verification_token: Option<String>,
476    pub polled: Option<NotionPolledChangeEvent>,
477    pub raw: JsonValue,
478}
479
480#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
481pub struct CronEventPayload {
482    pub cron_id: Option<String>,
483    pub schedule: Option<String>,
484    #[serde(with = "time::serde::rfc3339")]
485    pub tick_at: OffsetDateTime,
486    pub raw: JsonValue,
487}
488
489#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
490pub struct GenericWebhookPayload {
491    pub source: Option<String>,
492    pub content_type: Option<String>,
493    pub raw: JsonValue,
494}
495
496#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
497pub struct A2aPushPayload {
498    pub task_id: Option<String>,
499    pub sender: Option<String>,
500    pub raw: JsonValue,
501}
502
503#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
504pub struct ExtensionProviderPayload {
505    pub provider: String,
506    pub schema_name: String,
507    pub raw: JsonValue,
508}
509
510#[allow(clippy::large_enum_variant)]
511#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
512#[serde(untagged)]
513pub enum ProviderPayload {
514    Known(KnownProviderPayload),
515    Extension(ExtensionProviderPayload),
516}
517
518impl ProviderPayload {
519    pub fn provider(&self) -> &str {
520        match self {
521            Self::Known(known) => known.provider(),
522            Self::Extension(payload) => payload.provider.as_str(),
523        }
524    }
525
526    pub fn normalize(
527        provider: &ProviderId,
528        kind: &str,
529        headers: &BTreeMap<String, String>,
530        raw: JsonValue,
531    ) -> Result<Self, ProviderCatalogError> {
532        provider_catalog()
533            .read()
534            .expect("provider catalog poisoned")
535            .normalize(provider, kind, headers, raw)
536    }
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540#[serde(tag = "provider")]
541pub enum KnownProviderPayload {
542    #[serde(rename = "github")]
543    GitHub(GitHubEventPayload),
544    #[serde(rename = "slack")]
545    Slack(Box<SlackEventPayload>),
546    #[serde(rename = "linear")]
547    Linear(LinearEventPayload),
548    #[serde(rename = "notion")]
549    Notion(Box<NotionEventPayload>),
550    #[serde(rename = "cron")]
551    Cron(CronEventPayload),
552    #[serde(rename = "webhook")]
553    Webhook(GenericWebhookPayload),
554    #[serde(rename = "a2a-push")]
555    A2aPush(A2aPushPayload),
556}
557
558impl KnownProviderPayload {
559    pub fn provider(&self) -> &str {
560        match self {
561            Self::GitHub(_) => "github",
562            Self::Slack(_) => "slack",
563            Self::Linear(_) => "linear",
564            Self::Notion(_) => "notion",
565            Self::Cron(_) => "cron",
566            Self::Webhook(_) => "webhook",
567            Self::A2aPush(_) => "a2a-push",
568        }
569    }
570}
571
572#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
573pub struct TriggerEvent {
574    pub id: TriggerEventId,
575    pub provider: ProviderId,
576    pub kind: String,
577    #[serde(with = "time::serde::rfc3339")]
578    pub received_at: OffsetDateTime,
579    #[serde(with = "time::serde::rfc3339::option")]
580    pub occurred_at: Option<OffsetDateTime>,
581    pub dedupe_key: String,
582    pub trace_id: TraceId,
583    pub tenant_id: Option<TenantId>,
584    pub headers: BTreeMap<String, String>,
585    #[serde(default, skip_serializing_if = "Option::is_none")]
586    pub batch: Option<Vec<JsonValue>>,
587    pub provider_payload: ProviderPayload,
588    pub signature_status: SignatureStatus,
589    #[serde(skip)]
590    pub dedupe_claimed: bool,
591}
592
593impl TriggerEvent {
594    #[allow(clippy::too_many_arguments)]
595    pub fn new(
596        provider: ProviderId,
597        kind: impl Into<String>,
598        occurred_at: Option<OffsetDateTime>,
599        dedupe_key: impl Into<String>,
600        tenant_id: Option<TenantId>,
601        headers: BTreeMap<String, String>,
602        provider_payload: ProviderPayload,
603        signature_status: SignatureStatus,
604    ) -> Self {
605        Self {
606            id: TriggerEventId::new(),
607            provider,
608            kind: kind.into(),
609            received_at: clock::now_utc(),
610            occurred_at,
611            dedupe_key: dedupe_key.into(),
612            trace_id: TraceId::new(),
613            tenant_id,
614            headers,
615            batch: None,
616            provider_payload,
617            signature_status,
618            dedupe_claimed: false,
619        }
620    }
621
622    pub fn dedupe_claimed(&self) -> bool {
623        self.dedupe_claimed
624    }
625
626    pub fn mark_dedupe_claimed(&mut self) {
627        self.dedupe_claimed = true;
628    }
629}
630
631#[derive(Clone, Debug, PartialEq, Eq)]
632pub struct HeaderRedactionPolicy {
633    safe_exact_names: BTreeSet<String>,
634}
635
636impl HeaderRedactionPolicy {
637    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
638        self.safe_exact_names
639            .insert(name.into().to_ascii_lowercase());
640        self
641    }
642
643    fn should_keep(&self, name: &str) -> bool {
644        let lower = name.to_ascii_lowercase();
645        if self.safe_exact_names.contains(lower.as_str()) {
646            return true;
647        }
648        matches!(
649            lower.as_str(),
650            "user-agent"
651                | "request-id"
652                | "x-request-id"
653                | "x-correlation-id"
654                | "content-type"
655                | "content-length"
656                | "x-github-event"
657                | "x-github-delivery"
658                | "x-github-hook-id"
659                | "x-hub-signature-256"
660                | "x-slack-request-timestamp"
661                | "x-slack-signature"
662                | "x-linear-signature"
663                | "x-notion-signature"
664                | "x-a2a-signature"
665                | "x-a2a-delivery"
666        ) || lower.ends_with("-event")
667            || lower.ends_with("-delivery")
668            || lower.contains("timestamp")
669            || lower.contains("request-id")
670    }
671
672    fn should_redact(&self, name: &str) -> bool {
673        let lower = name.to_ascii_lowercase();
674        if self.should_keep(lower.as_str()) {
675            return false;
676        }
677        lower.contains("authorization")
678            || lower.contains("cookie")
679            || lower.contains("secret")
680            || lower.contains("token")
681            || lower.contains("key")
682    }
683}
684
685impl Default for HeaderRedactionPolicy {
686    fn default() -> Self {
687        Self {
688            safe_exact_names: BTreeSet::from([
689                "content-length".to_string(),
690                "content-type".to_string(),
691                "request-id".to_string(),
692                "user-agent".to_string(),
693                "x-a2a-delivery".to_string(),
694                "x-a2a-signature".to_string(),
695                "x-correlation-id".to_string(),
696                "x-github-delivery".to_string(),
697                "x-github-event".to_string(),
698                "x-github-hook-id".to_string(),
699                "x-hub-signature-256".to_string(),
700                "x-linear-signature".to_string(),
701                "x-notion-signature".to_string(),
702                "x-request-id".to_string(),
703                "x-slack-request-timestamp".to_string(),
704                "x-slack-signature".to_string(),
705            ]),
706        }
707    }
708}
709
710pub fn redact_headers(
711    headers: &BTreeMap<String, String>,
712    policy: &HeaderRedactionPolicy,
713) -> BTreeMap<String, String> {
714    headers
715        .iter()
716        .map(|(name, value)| {
717            if policy.should_redact(name) {
718                (name.clone(), REDACTED_HEADER_VALUE.to_string())
719            } else {
720                (name.clone(), value.clone())
721            }
722        })
723        .collect()
724}
725
726#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
727pub struct ProviderSecretRequirement {
728    pub name: String,
729    pub required: bool,
730    pub namespace: String,
731}
732
733#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
734pub struct ProviderOutboundMethod {
735    pub name: String,
736}
737
738#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
739#[serde(tag = "kind", rename_all = "snake_case")]
740pub enum SignatureVerificationMetadata {
741    #[default]
742    None,
743    Hmac {
744        variant: String,
745        raw_body: bool,
746        signature_header: String,
747        timestamp_header: Option<String>,
748        id_header: Option<String>,
749        default_tolerance_secs: Option<i64>,
750        digest: String,
751        encoding: String,
752    },
753}
754
755#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
756#[serde(tag = "kind", rename_all = "snake_case")]
757pub enum ProviderRuntimeMetadata {
758    Builtin {
759        connector: String,
760        default_signature_variant: Option<String>,
761    },
762    #[default]
763    Placeholder,
764}
765
766#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
767pub struct ProviderMetadata {
768    pub provider: String,
769    #[serde(default)]
770    pub kinds: Vec<String>,
771    pub schema_name: String,
772    #[serde(default)]
773    pub outbound_methods: Vec<ProviderOutboundMethod>,
774    #[serde(default)]
775    pub secret_requirements: Vec<ProviderSecretRequirement>,
776    #[serde(default)]
777    pub signature_verification: SignatureVerificationMetadata,
778    #[serde(default)]
779    pub runtime: ProviderRuntimeMetadata,
780}
781
782impl ProviderMetadata {
783    pub fn supports_kind(&self, kind: &str) -> bool {
784        self.kinds.iter().any(|candidate| candidate == kind)
785    }
786
787    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
788        self.secret_requirements
789            .iter()
790            .filter(|requirement| requirement.required)
791            .map(|requirement| requirement.name.as_str())
792    }
793}
794
795pub trait ProviderSchema: Send + Sync {
796    fn provider_id(&self) -> &'static str;
797    fn harn_schema_name(&self) -> &'static str;
798    fn metadata(&self) -> ProviderMetadata {
799        ProviderMetadata {
800            provider: self.provider_id().to_string(),
801            schema_name: self.harn_schema_name().to_string(),
802            ..ProviderMetadata::default()
803        }
804    }
805    fn normalize(
806        &self,
807        kind: &str,
808        headers: &BTreeMap<String, String>,
809        raw: JsonValue,
810    ) -> Result<ProviderPayload, ProviderCatalogError>;
811}
812
813#[derive(Clone, Debug, PartialEq, Eq)]
814pub enum ProviderCatalogError {
815    DuplicateProvider(String),
816    UnknownProvider(String),
817}
818
819impl std::fmt::Display for ProviderCatalogError {
820    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
821        match self {
822            Self::DuplicateProvider(provider) => {
823                write!(f, "provider `{provider}` is already registered")
824            }
825            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
826        }
827    }
828}
829
830impl std::error::Error for ProviderCatalogError {}
831
832#[derive(Clone, Default)]
833pub struct ProviderCatalog {
834    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
835}
836
837impl ProviderCatalog {
838    pub fn with_defaults() -> Self {
839        let mut catalog = Self::default();
840        for schema in default_provider_schemas() {
841            catalog
842                .register(schema)
843                .expect("default providers must register cleanly");
844        }
845        catalog
846    }
847
848    pub fn register(
849        &mut self,
850        schema: Arc<dyn ProviderSchema>,
851    ) -> Result<(), ProviderCatalogError> {
852        let provider = schema.provider_id().to_string();
853        if self.providers.contains_key(provider.as_str()) {
854            return Err(ProviderCatalogError::DuplicateProvider(provider));
855        }
856        self.providers.insert(provider, schema);
857        Ok(())
858    }
859
860    pub fn normalize(
861        &self,
862        provider: &ProviderId,
863        kind: &str,
864        headers: &BTreeMap<String, String>,
865        raw: JsonValue,
866    ) -> Result<ProviderPayload, ProviderCatalogError> {
867        let schema = self
868            .providers
869            .get(provider.as_str())
870            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
871        schema.normalize(kind, headers, raw)
872    }
873
874    pub fn schema_names(&self) -> BTreeMap<String, String> {
875        self.providers
876            .iter()
877            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
878            .collect()
879    }
880
881    pub fn entries(&self) -> Vec<ProviderMetadata> {
882        self.providers
883            .values()
884            .map(|schema| schema.metadata())
885            .collect()
886    }
887
888    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
889        self.providers.get(provider).map(|schema| schema.metadata())
890    }
891}
892
893pub fn register_provider_schema(
894    schema: Arc<dyn ProviderSchema>,
895) -> Result<(), ProviderCatalogError> {
896    provider_catalog()
897        .write()
898        .expect("provider catalog poisoned")
899        .register(schema)
900}
901
902pub fn reset_provider_catalog() {
903    *provider_catalog()
904        .write()
905        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
906}
907
908pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
909    provider_catalog()
910        .read()
911        .expect("provider catalog poisoned")
912        .schema_names()
913}
914
915pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
916    provider_catalog()
917        .read()
918        .expect("provider catalog poisoned")
919        .entries()
920}
921
922pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
923    provider_catalog()
924        .read()
925        .expect("provider catalog poisoned")
926        .metadata_for(provider)
927}
928
929fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
930    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
931    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
932}
933
934struct BuiltinProviderSchema {
935    provider_id: &'static str,
936    harn_schema_name: &'static str,
937    metadata: ProviderMetadata,
938    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
939}
940
941impl ProviderSchema for BuiltinProviderSchema {
942    fn provider_id(&self) -> &'static str {
943        self.provider_id
944    }
945
946    fn harn_schema_name(&self) -> &'static str {
947        self.harn_schema_name
948    }
949
950    fn metadata(&self) -> ProviderMetadata {
951        self.metadata.clone()
952    }
953
954    fn normalize(
955        &self,
956        kind: &str,
957        headers: &BTreeMap<String, String>,
958        raw: JsonValue,
959    ) -> Result<ProviderPayload, ProviderCatalogError> {
960        Ok((self.normalize)(kind, headers, raw))
961    }
962}
963
964fn provider_metadata_entry(
965    provider: &str,
966    kinds: &[&str],
967    schema_name: &str,
968    outbound_methods: &[&str],
969    signature_verification: SignatureVerificationMetadata,
970    secret_requirements: Vec<ProviderSecretRequirement>,
971    runtime: ProviderRuntimeMetadata,
972) -> ProviderMetadata {
973    ProviderMetadata {
974        provider: provider.to_string(),
975        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
976        schema_name: schema_name.to_string(),
977        outbound_methods: outbound_methods
978            .iter()
979            .map(|name| ProviderOutboundMethod {
980                name: (*name).to_string(),
981            })
982            .collect(),
983        secret_requirements,
984        signature_verification,
985        runtime,
986    }
987}
988
989fn hmac_signature_metadata(
990    variant: &str,
991    signature_header: &str,
992    timestamp_header: Option<&str>,
993    id_header: Option<&str>,
994    default_tolerance_secs: Option<i64>,
995    encoding: &str,
996) -> SignatureVerificationMetadata {
997    SignatureVerificationMetadata::Hmac {
998        variant: variant.to_string(),
999        raw_body: true,
1000        signature_header: signature_header.to_string(),
1001        timestamp_header: timestamp_header.map(ToString::to_string),
1002        id_header: id_header.map(ToString::to_string),
1003        default_tolerance_secs,
1004        digest: "sha256".to_string(),
1005        encoding: encoding.to_string(),
1006    }
1007}
1008
1009fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1010    ProviderSecretRequirement {
1011        name: name.to_string(),
1012        required: true,
1013        namespace: namespace.to_string(),
1014    }
1015}
1016
1017fn outbound_method(name: &str) -> ProviderOutboundMethod {
1018    ProviderOutboundMethod {
1019        name: name.to_string(),
1020    }
1021}
1022
1023fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1024    ProviderSecretRequirement {
1025        name: name.to_string(),
1026        required: false,
1027        namespace: namespace.to_string(),
1028    }
1029}
1030
1031fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1032    vec![
1033        Arc::new(BuiltinProviderSchema {
1034            provider_id: "github",
1035            harn_schema_name: "GitHubEventPayload",
1036            metadata: provider_metadata_entry(
1037                "github",
1038                &["webhook"],
1039                "GitHubEventPayload",
1040                &[],
1041                hmac_signature_metadata(
1042                    "github",
1043                    "X-Hub-Signature-256",
1044                    None,
1045                    Some("X-GitHub-Delivery"),
1046                    None,
1047                    "hex",
1048                ),
1049                vec![required_secret("signing_secret", "github")],
1050                ProviderRuntimeMetadata::Builtin {
1051                    connector: "webhook".to_string(),
1052                    default_signature_variant: Some("github".to_string()),
1053                },
1054            ),
1055            normalize: github_payload,
1056        }),
1057        Arc::new(BuiltinProviderSchema {
1058            provider_id: "slack",
1059            harn_schema_name: "SlackEventPayload",
1060            metadata: provider_metadata_entry(
1061                "slack",
1062                &["webhook"],
1063                "SlackEventPayload",
1064                &[
1065                    "post_message",
1066                    "update_message",
1067                    "add_reaction",
1068                    "open_view",
1069                    "user_info",
1070                    "api_call",
1071                    "upload_file",
1072                ],
1073                hmac_signature_metadata(
1074                    "slack",
1075                    "X-Slack-Signature",
1076                    Some("X-Slack-Request-Timestamp"),
1077                    None,
1078                    Some(300),
1079                    "hex",
1080                ),
1081                vec![required_secret("signing_secret", "slack")],
1082                ProviderRuntimeMetadata::Builtin {
1083                    connector: "slack".to_string(),
1084                    default_signature_variant: Some("slack".to_string()),
1085                },
1086            ),
1087            normalize: slack_payload,
1088        }),
1089        Arc::new(BuiltinProviderSchema {
1090            provider_id: "linear",
1091            harn_schema_name: "LinearEventPayload",
1092            metadata: {
1093                let mut metadata = provider_metadata_entry(
1094                    "linear",
1095                    &["webhook"],
1096                    "LinearEventPayload",
1097                    &[],
1098                    hmac_signature_metadata(
1099                        "linear",
1100                        "Linear-Signature",
1101                        None,
1102                        Some("Linear-Delivery"),
1103                        Some(75),
1104                        "hex",
1105                    ),
1106                    vec![
1107                        required_secret("signing_secret", "linear"),
1108                        optional_secret("access_token", "linear"),
1109                    ],
1110                    ProviderRuntimeMetadata::Builtin {
1111                        connector: "linear".to_string(),
1112                        default_signature_variant: Some("linear".to_string()),
1113                    },
1114                );
1115                metadata.outbound_methods = vec![
1116                    ProviderOutboundMethod {
1117                        name: "list_issues".to_string(),
1118                    },
1119                    ProviderOutboundMethod {
1120                        name: "update_issue".to_string(),
1121                    },
1122                    ProviderOutboundMethod {
1123                        name: "create_comment".to_string(),
1124                    },
1125                    ProviderOutboundMethod {
1126                        name: "search".to_string(),
1127                    },
1128                    ProviderOutboundMethod {
1129                        name: "graphql".to_string(),
1130                    },
1131                ];
1132                metadata
1133            },
1134            normalize: linear_payload,
1135        }),
1136        Arc::new(BuiltinProviderSchema {
1137            provider_id: "notion",
1138            harn_schema_name: "NotionEventPayload",
1139            metadata: {
1140                let mut metadata = provider_metadata_entry(
1141                    "notion",
1142                    &["webhook", "poll"],
1143                    "NotionEventPayload",
1144                    &[],
1145                    hmac_signature_metadata(
1146                        "notion",
1147                        "X-Notion-Signature",
1148                        None,
1149                        None,
1150                        None,
1151                        "hex",
1152                    ),
1153                    vec![required_secret("verification_token", "notion")],
1154                    ProviderRuntimeMetadata::Builtin {
1155                        connector: "notion".to_string(),
1156                        default_signature_variant: Some("notion".to_string()),
1157                    },
1158                );
1159                metadata.outbound_methods = vec![
1160                    outbound_method("get_page"),
1161                    outbound_method("update_page"),
1162                    outbound_method("append_blocks"),
1163                    outbound_method("query_database"),
1164                    outbound_method("search"),
1165                    outbound_method("create_comment"),
1166                    outbound_method("api_call"),
1167                ];
1168                metadata
1169            },
1170            normalize: notion_payload,
1171        }),
1172        Arc::new(BuiltinProviderSchema {
1173            provider_id: "cron",
1174            harn_schema_name: "CronEventPayload",
1175            metadata: provider_metadata_entry(
1176                "cron",
1177                &["cron"],
1178                "CronEventPayload",
1179                &[],
1180                SignatureVerificationMetadata::None,
1181                Vec::new(),
1182                ProviderRuntimeMetadata::Builtin {
1183                    connector: "cron".to_string(),
1184                    default_signature_variant: None,
1185                },
1186            ),
1187            normalize: cron_payload,
1188        }),
1189        Arc::new(BuiltinProviderSchema {
1190            provider_id: "webhook",
1191            harn_schema_name: "GenericWebhookPayload",
1192            metadata: provider_metadata_entry(
1193                "webhook",
1194                &["webhook"],
1195                "GenericWebhookPayload",
1196                &[],
1197                hmac_signature_metadata(
1198                    "standard",
1199                    "webhook-signature",
1200                    Some("webhook-timestamp"),
1201                    Some("webhook-id"),
1202                    Some(300),
1203                    "base64",
1204                ),
1205                vec![required_secret("signing_secret", "webhook")],
1206                ProviderRuntimeMetadata::Builtin {
1207                    connector: "webhook".to_string(),
1208                    default_signature_variant: Some("standard".to_string()),
1209                },
1210            ),
1211            normalize: webhook_payload,
1212        }),
1213        Arc::new(BuiltinProviderSchema {
1214            provider_id: "a2a-push",
1215            harn_schema_name: "A2aPushPayload",
1216            metadata: provider_metadata_entry(
1217                "a2a-push",
1218                &["a2a-push"],
1219                "A2aPushPayload",
1220                &[],
1221                SignatureVerificationMetadata::None,
1222                Vec::new(),
1223                ProviderRuntimeMetadata::Placeholder,
1224            ),
1225            normalize: a2a_push_payload,
1226        }),
1227    ]
1228}
1229
1230fn github_payload(
1231    kind: &str,
1232    headers: &BTreeMap<String, String>,
1233    raw: JsonValue,
1234) -> ProviderPayload {
1235    let common = GitHubEventCommon {
1236        event: kind.to_string(),
1237        action: raw
1238            .get("action")
1239            .and_then(JsonValue::as_str)
1240            .map(ToString::to_string),
1241        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1242        installation_id: raw
1243            .get("installation")
1244            .and_then(|value| value.get("id"))
1245            .and_then(JsonValue::as_i64),
1246        raw: raw.clone(),
1247    };
1248    let payload = match kind {
1249        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1250            common,
1251            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1252        }),
1253        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1254            common,
1255            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1256        }),
1257        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1258            common,
1259            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1260            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1261        }),
1262        "pull_request_review" => {
1263            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1264                common,
1265                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1266                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1267            })
1268        }
1269        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1270            common,
1271            commits: raw
1272                .get("commits")
1273                .and_then(JsonValue::as_array)
1274                .cloned()
1275                .unwrap_or_default(),
1276            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1277        }),
1278        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1279            common,
1280            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1281        }),
1282        _ => GitHubEventPayload::Other(common),
1283    };
1284    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1285}
1286
1287fn slack_payload(
1288    kind: &str,
1289    _headers: &BTreeMap<String, String>,
1290    raw: JsonValue,
1291) -> ProviderPayload {
1292    let event = raw.get("event");
1293    let common = SlackEventCommon {
1294        event: kind.to_string(),
1295        event_id: raw
1296            .get("event_id")
1297            .and_then(JsonValue::as_str)
1298            .map(ToString::to_string),
1299        api_app_id: raw
1300            .get("api_app_id")
1301            .and_then(JsonValue::as_str)
1302            .map(ToString::to_string),
1303        team_id: raw
1304            .get("team_id")
1305            .and_then(JsonValue::as_str)
1306            .map(ToString::to_string),
1307        channel_id: slack_channel_id(event),
1308        user_id: slack_user_id(event),
1309        event_ts: event
1310            .and_then(|value| value.get("event_ts"))
1311            .and_then(JsonValue::as_str)
1312            .map(ToString::to_string),
1313        raw: raw.clone(),
1314    };
1315    let payload = match kind {
1316        kind if kind == "message" || kind.starts_with("message.") => {
1317            SlackEventPayload::Message(SlackMessageEventPayload {
1318                subtype: event
1319                    .and_then(|value| value.get("subtype"))
1320                    .and_then(JsonValue::as_str)
1321                    .map(ToString::to_string),
1322                channel_type: event
1323                    .and_then(|value| value.get("channel_type"))
1324                    .and_then(JsonValue::as_str)
1325                    .map(ToString::to_string),
1326                channel: event
1327                    .and_then(|value| value.get("channel"))
1328                    .and_then(JsonValue::as_str)
1329                    .map(ToString::to_string),
1330                user: event
1331                    .and_then(|value| value.get("user"))
1332                    .and_then(JsonValue::as_str)
1333                    .map(ToString::to_string),
1334                text: event
1335                    .and_then(|value| value.get("text"))
1336                    .and_then(JsonValue::as_str)
1337                    .map(ToString::to_string),
1338                ts: event
1339                    .and_then(|value| value.get("ts"))
1340                    .and_then(JsonValue::as_str)
1341                    .map(ToString::to_string),
1342                thread_ts: event
1343                    .and_then(|value| value.get("thread_ts"))
1344                    .and_then(JsonValue::as_str)
1345                    .map(ToString::to_string),
1346                common,
1347            })
1348        }
1349        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1350            channel: event
1351                .and_then(|value| value.get("channel"))
1352                .and_then(JsonValue::as_str)
1353                .map(ToString::to_string),
1354            user: event
1355                .and_then(|value| value.get("user"))
1356                .and_then(JsonValue::as_str)
1357                .map(ToString::to_string),
1358            text: event
1359                .and_then(|value| value.get("text"))
1360                .and_then(JsonValue::as_str)
1361                .map(ToString::to_string),
1362            ts: event
1363                .and_then(|value| value.get("ts"))
1364                .and_then(JsonValue::as_str)
1365                .map(ToString::to_string),
1366            thread_ts: event
1367                .and_then(|value| value.get("thread_ts"))
1368                .and_then(JsonValue::as_str)
1369                .map(ToString::to_string),
1370            common,
1371        }),
1372        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1373            reaction: event
1374                .and_then(|value| value.get("reaction"))
1375                .and_then(JsonValue::as_str)
1376                .map(ToString::to_string),
1377            item_user: event
1378                .and_then(|value| value.get("item_user"))
1379                .and_then(JsonValue::as_str)
1380                .map(ToString::to_string),
1381            item: event
1382                .and_then(|value| value.get("item"))
1383                .cloned()
1384                .unwrap_or(JsonValue::Null),
1385            common,
1386        }),
1387        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1388            user: event
1389                .and_then(|value| value.get("user"))
1390                .and_then(JsonValue::as_str)
1391                .map(ToString::to_string),
1392            channel: event
1393                .and_then(|value| value.get("channel"))
1394                .and_then(JsonValue::as_str)
1395                .map(ToString::to_string),
1396            tab: event
1397                .and_then(|value| value.get("tab"))
1398                .and_then(JsonValue::as_str)
1399                .map(ToString::to_string),
1400            view: event
1401                .and_then(|value| value.get("view"))
1402                .cloned()
1403                .unwrap_or(JsonValue::Null),
1404            common,
1405        }),
1406        "assistant_thread_started" => {
1407            let assistant_thread = event
1408                .and_then(|value| value.get("assistant_thread"))
1409                .cloned()
1410                .unwrap_or(JsonValue::Null);
1411            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1412                thread_ts: assistant_thread
1413                    .get("thread_ts")
1414                    .and_then(JsonValue::as_str)
1415                    .map(ToString::to_string),
1416                context: assistant_thread
1417                    .get("context")
1418                    .cloned()
1419                    .unwrap_or(JsonValue::Null),
1420                assistant_thread,
1421                common,
1422            })
1423        }
1424        _ => SlackEventPayload::Other(common),
1425    };
1426    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1427}
1428
1429fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1430    event
1431        .and_then(|value| value.get("channel"))
1432        .and_then(JsonValue::as_str)
1433        .map(ToString::to_string)
1434        .or_else(|| {
1435            event
1436                .and_then(|value| value.get("item"))
1437                .and_then(|value| value.get("channel"))
1438                .and_then(JsonValue::as_str)
1439                .map(ToString::to_string)
1440        })
1441        .or_else(|| {
1442            event
1443                .and_then(|value| value.get("channel"))
1444                .and_then(|value| value.get("id"))
1445                .and_then(JsonValue::as_str)
1446                .map(ToString::to_string)
1447        })
1448        .or_else(|| {
1449            event
1450                .and_then(|value| value.get("assistant_thread"))
1451                .and_then(|value| value.get("channel_id"))
1452                .and_then(JsonValue::as_str)
1453                .map(ToString::to_string)
1454        })
1455}
1456
1457fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1458    event
1459        .and_then(|value| value.get("user"))
1460        .and_then(JsonValue::as_str)
1461        .map(ToString::to_string)
1462        .or_else(|| {
1463            event
1464                .and_then(|value| value.get("user"))
1465                .and_then(|value| value.get("id"))
1466                .and_then(JsonValue::as_str)
1467                .map(ToString::to_string)
1468        })
1469        .or_else(|| {
1470            event
1471                .and_then(|value| value.get("item_user"))
1472                .and_then(JsonValue::as_str)
1473                .map(ToString::to_string)
1474        })
1475        .or_else(|| {
1476            event
1477                .and_then(|value| value.get("assistant_thread"))
1478                .and_then(|value| value.get("user_id"))
1479                .and_then(JsonValue::as_str)
1480                .map(ToString::to_string)
1481        })
1482}
1483
1484fn linear_payload(
1485    _kind: &str,
1486    headers: &BTreeMap<String, String>,
1487    raw: JsonValue,
1488) -> ProviderPayload {
1489    let common = linear_event_common(headers, &raw);
1490    let event = common.event.clone();
1491    let payload = match event.as_str() {
1492        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1493            common,
1494            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1495            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1496        }),
1497        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1498            common,
1499            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1500        }),
1501        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1502            common,
1503            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1504        }),
1505        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1506            common,
1507            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1508        }),
1509        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1510            common,
1511            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1512        }),
1513        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1514            common,
1515            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1516        }),
1517        "customer_request" => {
1518            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1519                common,
1520                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1521            })
1522        }
1523        _ => LinearEventPayload::Other(common),
1524    };
1525    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1526}
1527
1528fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1529    LinearEventCommon {
1530        event: linear_event_name(
1531            raw.get("type")
1532                .and_then(JsonValue::as_str)
1533                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1534        ),
1535        action: raw
1536            .get("action")
1537            .and_then(JsonValue::as_str)
1538            .map(ToString::to_string),
1539        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1540        organization_id: raw
1541            .get("organizationId")
1542            .and_then(JsonValue::as_str)
1543            .map(ToString::to_string),
1544        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1545        webhook_id: raw
1546            .get("webhookId")
1547            .and_then(JsonValue::as_str)
1548            .map(ToString::to_string),
1549        url: raw
1550            .get("url")
1551            .and_then(JsonValue::as_str)
1552            .map(ToString::to_string),
1553        created_at: raw
1554            .get("createdAt")
1555            .and_then(JsonValue::as_str)
1556            .map(ToString::to_string),
1557        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1558        raw: raw.clone(),
1559    }
1560}
1561
1562fn linear_event_name(raw_type: Option<&str>) -> String {
1563    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1564        "issue" => "issue".to_string(),
1565        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1566        "issuelabel" | "issue_label" => "issue_label".to_string(),
1567        "project" | "projectupdate" | "project_update" => "project".to_string(),
1568        "cycle" => "cycle".to_string(),
1569        "customer" => "customer".to_string(),
1570        "customerrequest" | "customer_request" => "customer_request".to_string(),
1571        other if !other.is_empty() => other.to_string(),
1572        _ => "other".to_string(),
1573    }
1574}
1575
1576fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1577    let Some(JsonValue::Object(fields)) = updated_from else {
1578        return Vec::new();
1579    };
1580    let mut changes = Vec::new();
1581    for (field, previous) in fields {
1582        let change = match field.as_str() {
1583            "title" => LinearIssueChange::Title {
1584                previous: previous.as_str().map(ToString::to_string),
1585            },
1586            "description" => LinearIssueChange::Description {
1587                previous: previous.as_str().map(ToString::to_string),
1588            },
1589            "priority" => LinearIssueChange::Priority {
1590                previous: parse_json_i64ish(previous),
1591            },
1592            "estimate" => LinearIssueChange::Estimate {
1593                previous: parse_json_i64ish(previous),
1594            },
1595            "stateId" => LinearIssueChange::StateId {
1596                previous: previous.as_str().map(ToString::to_string),
1597            },
1598            "teamId" => LinearIssueChange::TeamId {
1599                previous: previous.as_str().map(ToString::to_string),
1600            },
1601            "assigneeId" => LinearIssueChange::AssigneeId {
1602                previous: previous.as_str().map(ToString::to_string),
1603            },
1604            "projectId" => LinearIssueChange::ProjectId {
1605                previous: previous.as_str().map(ToString::to_string),
1606            },
1607            "cycleId" => LinearIssueChange::CycleId {
1608                previous: previous.as_str().map(ToString::to_string),
1609            },
1610            "dueDate" => LinearIssueChange::DueDate {
1611                previous: previous.as_str().map(ToString::to_string),
1612            },
1613            "parentId" => LinearIssueChange::ParentId {
1614                previous: previous.as_str().map(ToString::to_string),
1615            },
1616            "sortOrder" => LinearIssueChange::SortOrder {
1617                previous: previous.as_f64(),
1618            },
1619            "labelIds" => LinearIssueChange::LabelIds {
1620                previous: parse_string_array(previous),
1621            },
1622            "completedAt" => LinearIssueChange::CompletedAt {
1623                previous: previous.as_str().map(ToString::to_string),
1624            },
1625            _ => LinearIssueChange::Other {
1626                field: field.clone(),
1627                previous: previous.clone(),
1628            },
1629        };
1630        changes.push(change);
1631    }
1632    changes
1633}
1634
1635fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1636    value
1637        .as_i64()
1638        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1639        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1640}
1641
1642fn parse_string_array(value: &JsonValue) -> Vec<String> {
1643    let Some(array) = value.as_array() else {
1644        return Vec::new();
1645    };
1646    array
1647        .iter()
1648        .filter_map(|entry| {
1649            entry.as_str().map(ToString::to_string).or_else(|| {
1650                entry
1651                    .get("id")
1652                    .and_then(JsonValue::as_str)
1653                    .map(ToString::to_string)
1654            })
1655        })
1656        .collect()
1657}
1658
1659fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1660    headers
1661        .iter()
1662        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1663        .map(|(_, value)| value.as_str())
1664}
1665
1666fn notion_payload(
1667    kind: &str,
1668    headers: &BTreeMap<String, String>,
1669    raw: JsonValue,
1670) -> ProviderPayload {
1671    let workspace_id = raw
1672        .get("workspace_id")
1673        .and_then(JsonValue::as_str)
1674        .map(ToString::to_string);
1675    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1676        event: kind.to_string(),
1677        workspace_id,
1678        request_id: headers
1679            .get("request-id")
1680            .cloned()
1681            .or_else(|| headers.get("x-request-id").cloned()),
1682        subscription_id: raw
1683            .get("subscription_id")
1684            .and_then(JsonValue::as_str)
1685            .map(ToString::to_string),
1686        integration_id: raw
1687            .get("integration_id")
1688            .and_then(JsonValue::as_str)
1689            .map(ToString::to_string),
1690        attempt_number: raw
1691            .get("attempt_number")
1692            .and_then(JsonValue::as_u64)
1693            .and_then(|value| u32::try_from(value).ok()),
1694        entity_id: raw
1695            .get("entity")
1696            .and_then(|value| value.get("id"))
1697            .and_then(JsonValue::as_str)
1698            .map(ToString::to_string),
1699        entity_type: raw
1700            .get("entity")
1701            .and_then(|value| value.get("type"))
1702            .and_then(JsonValue::as_str)
1703            .map(ToString::to_string),
1704        api_version: raw
1705            .get("api_version")
1706            .and_then(JsonValue::as_str)
1707            .map(ToString::to_string),
1708        verification_token: raw
1709            .get("verification_token")
1710            .and_then(JsonValue::as_str)
1711            .map(ToString::to_string),
1712        polled: None,
1713        raw,
1714    })))
1715}
1716
1717fn cron_payload(
1718    _kind: &str,
1719    _headers: &BTreeMap<String, String>,
1720    raw: JsonValue,
1721) -> ProviderPayload {
1722    let cron_id = raw
1723        .get("cron_id")
1724        .and_then(JsonValue::as_str)
1725        .map(ToString::to_string);
1726    let schedule = raw
1727        .get("schedule")
1728        .and_then(JsonValue::as_str)
1729        .map(ToString::to_string);
1730    let tick_at = raw
1731        .get("tick_at")
1732        .and_then(JsonValue::as_str)
1733        .and_then(parse_rfc3339)
1734        .unwrap_or_else(OffsetDateTime::now_utc);
1735    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1736        cron_id,
1737        schedule,
1738        tick_at,
1739        raw,
1740    }))
1741}
1742
1743fn webhook_payload(
1744    _kind: &str,
1745    headers: &BTreeMap<String, String>,
1746    raw: JsonValue,
1747) -> ProviderPayload {
1748    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1749        source: headers.get("X-Webhook-Source").cloned(),
1750        content_type: headers.get("Content-Type").cloned(),
1751        raw,
1752    }))
1753}
1754
1755fn a2a_push_payload(
1756    _kind: &str,
1757    _headers: &BTreeMap<String, String>,
1758    raw: JsonValue,
1759) -> ProviderPayload {
1760    let task_id = raw
1761        .get("task_id")
1762        .and_then(JsonValue::as_str)
1763        .map(ToString::to_string);
1764    let sender = raw
1765        .get("sender")
1766        .and_then(JsonValue::as_str)
1767        .map(ToString::to_string);
1768    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1769        task_id,
1770        sender,
1771        raw,
1772    }))
1773}
1774
1775fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1776    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1777}
1778
1779#[cfg(test)]
1780mod tests {
1781    use super::*;
1782
1783    fn sample_headers() -> BTreeMap<String, String> {
1784        BTreeMap::from([
1785            ("Authorization".to_string(), "Bearer secret".to_string()),
1786            ("Cookie".to_string(), "session=abc".to_string()),
1787            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
1788            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1789            ("X-GitHub-Event".to_string(), "issues".to_string()),
1790            ("X-Webhook-Token".to_string(), "token".to_string()),
1791        ])
1792    }
1793
1794    #[test]
1795    fn default_redaction_policy_keeps_safe_headers() {
1796        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1797        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
1798        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
1799        assert_eq!(
1800            redacted.get("Authorization").unwrap(),
1801            REDACTED_HEADER_VALUE
1802        );
1803        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
1804        assert_eq!(
1805            redacted.get("X-Webhook-Token").unwrap(),
1806            REDACTED_HEADER_VALUE
1807        );
1808    }
1809
1810    #[test]
1811    fn provider_catalog_rejects_duplicates() {
1812        let mut catalog = ProviderCatalog::default();
1813        catalog
1814            .register(Arc::new(BuiltinProviderSchema {
1815                provider_id: "github",
1816                harn_schema_name: "GitHubEventPayload",
1817                metadata: provider_metadata_entry(
1818                    "github",
1819                    &["webhook"],
1820                    "GitHubEventPayload",
1821                    &[],
1822                    SignatureVerificationMetadata::None,
1823                    Vec::new(),
1824                    ProviderRuntimeMetadata::Placeholder,
1825                ),
1826                normalize: github_payload,
1827            }))
1828            .unwrap();
1829        let error = catalog
1830            .register(Arc::new(BuiltinProviderSchema {
1831                provider_id: "github",
1832                harn_schema_name: "GitHubEventPayload",
1833                metadata: provider_metadata_entry(
1834                    "github",
1835                    &["webhook"],
1836                    "GitHubEventPayload",
1837                    &[],
1838                    SignatureVerificationMetadata::None,
1839                    Vec::new(),
1840                    ProviderRuntimeMetadata::Placeholder,
1841                ),
1842                normalize: github_payload,
1843            }))
1844            .unwrap_err();
1845        assert_eq!(
1846            error,
1847            ProviderCatalogError::DuplicateProvider("github".to_string())
1848        );
1849    }
1850
1851    #[test]
1852    fn registered_provider_metadata_marks_builtin_connectors() {
1853        let entries = registered_provider_metadata();
1854        let builtin: Vec<&ProviderMetadata> = entries
1855            .iter()
1856            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
1857            .collect();
1858
1859        assert_eq!(builtin.len(), 6);
1860        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
1861        assert!(builtin.iter().any(|entry| entry.provider == "github"));
1862        assert!(builtin.iter().any(|entry| entry.provider == "linear"));
1863        assert!(builtin.iter().any(|entry| entry.provider == "notion"));
1864        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
1865        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
1866    }
1867
1868    #[test]
1869    fn trigger_event_round_trip_is_stable() {
1870        let provider = ProviderId::from("github");
1871        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1872        let payload = ProviderPayload::normalize(
1873            &provider,
1874            "issues",
1875            &sample_headers(),
1876            serde_json::json!({
1877                "action": "opened",
1878                "installation": {"id": 42},
1879                "issue": {"number": 99}
1880            }),
1881        )
1882        .unwrap();
1883        let event = TriggerEvent {
1884            id: TriggerEventId("trigger_evt_fixed".to_string()),
1885            provider,
1886            kind: "issues".to_string(),
1887            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
1888            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
1889            dedupe_key: "delivery-123".to_string(),
1890            trace_id: TraceId("trace_fixed".to_string()),
1891            tenant_id: Some(TenantId("tenant_1".to_string())),
1892            headers,
1893            provider_payload: payload,
1894            signature_status: SignatureStatus::Verified,
1895            dedupe_claimed: false,
1896            batch: None,
1897        };
1898
1899        let once = serde_json::to_value(&event).unwrap();
1900        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
1901        let twice = serde_json::to_value(&decoded).unwrap();
1902        assert_eq!(decoded, event);
1903        assert_eq!(once, twice);
1904    }
1905
1906    #[test]
1907    fn unknown_provider_errors() {
1908        let error = ProviderPayload::normalize(
1909            &ProviderId::from("custom-provider"),
1910            "thing.happened",
1911            &BTreeMap::new(),
1912            serde_json::json!({"ok": true}),
1913        )
1914        .unwrap_err();
1915        assert_eq!(
1916            error,
1917            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
1918        );
1919    }
1920}