Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
14#[serde(transparent)]
15pub struct TriggerEventId(pub String);
16
17impl TriggerEventId {
18    pub fn new() -> Self {
19        Self(format!("trigger_evt_{}", Uuid::now_v7()))
20    }
21}
22
23impl Default for TriggerEventId {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ProviderId(pub String);
32
33impl ProviderId {
34    pub fn new(value: impl Into<String>) -> Self {
35        Self(value.into())
36    }
37
38    pub fn as_str(&self) -> &str {
39        self.0.as_str()
40    }
41}
42
43impl From<&str> for ProviderId {
44    fn from(value: &str) -> Self {
45        Self::new(value)
46    }
47}
48
49impl From<String> for ProviderId {
50    fn from(value: String) -> Self {
51        Self::new(value)
52    }
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56#[serde(transparent)]
57pub struct TraceId(pub String);
58
59impl TraceId {
60    pub fn new() -> Self {
61        Self(format!("trace_{}", Uuid::now_v7()))
62    }
63}
64
65impl Default for TraceId {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
72#[serde(transparent)]
73pub struct TenantId(pub String);
74
75impl TenantId {
76    pub fn new(value: impl Into<String>) -> Self {
77        Self(value.into())
78    }
79}
80
81#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(tag = "state", rename_all = "snake_case")]
83pub enum SignatureStatus {
84    Verified,
85    Unsigned,
86    Failed { reason: String },
87}
88
89#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
90pub struct GitHubEventCommon {
91    pub event: String,
92    pub action: Option<String>,
93    pub delivery_id: Option<String>,
94    pub installation_id: Option<i64>,
95    pub raw: JsonValue,
96}
97
98#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
99pub struct GitHubIssuesEventPayload {
100    #[serde(flatten)]
101    pub common: GitHubEventCommon,
102    pub issue: JsonValue,
103}
104
105#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
106pub struct GitHubPullRequestEventPayload {
107    #[serde(flatten)]
108    pub common: GitHubEventCommon,
109    pub pull_request: JsonValue,
110}
111
112#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
113pub struct GitHubIssueCommentEventPayload {
114    #[serde(flatten)]
115    pub common: GitHubEventCommon,
116    pub issue: JsonValue,
117    pub comment: JsonValue,
118}
119
120#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
121pub struct GitHubPullRequestReviewEventPayload {
122    #[serde(flatten)]
123    pub common: GitHubEventCommon,
124    pub pull_request: JsonValue,
125    pub review: JsonValue,
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct GitHubPushEventPayload {
130    #[serde(flatten)]
131    pub common: GitHubEventCommon,
132    #[serde(default)]
133    pub commits: Vec<JsonValue>,
134    pub distinct_size: Option<i64>,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubWorkflowRunEventPayload {
139    #[serde(flatten)]
140    pub common: GitHubEventCommon,
141    pub workflow_run: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145#[serde(untagged)]
146pub enum GitHubEventPayload {
147    Issues(GitHubIssuesEventPayload),
148    PullRequest(GitHubPullRequestEventPayload),
149    IssueComment(GitHubIssueCommentEventPayload),
150    PullRequestReview(GitHubPullRequestReviewEventPayload),
151    Push(GitHubPushEventPayload),
152    WorkflowRun(GitHubWorkflowRunEventPayload),
153    Other(GitHubEventCommon),
154}
155
156#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
157pub struct SlackEventCommon {
158    pub event: String,
159    pub event_id: Option<String>,
160    pub api_app_id: Option<String>,
161    pub team_id: Option<String>,
162    pub channel_id: Option<String>,
163    pub user_id: Option<String>,
164    pub event_ts: Option<String>,
165    pub raw: JsonValue,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169pub struct SlackMessageChannelsEventPayload {
170    #[serde(flatten)]
171    pub common: SlackEventCommon,
172    pub subtype: Option<String>,
173    pub channel: Option<String>,
174    pub user: Option<String>,
175    pub text: Option<String>,
176    pub ts: Option<String>,
177    pub thread_ts: Option<String>,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct SlackAppMentionEventPayload {
182    #[serde(flatten)]
183    pub common: SlackEventCommon,
184    pub channel: Option<String>,
185    pub user: Option<String>,
186    pub text: Option<String>,
187    pub ts: Option<String>,
188    pub thread_ts: Option<String>,
189}
190
191#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
192pub struct SlackReactionAddedEventPayload {
193    #[serde(flatten)]
194    pub common: SlackEventCommon,
195    pub reaction: Option<String>,
196    pub item_user: Option<String>,
197    pub item: JsonValue,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
201pub struct SlackTeamJoinEventPayload {
202    #[serde(flatten)]
203    pub common: SlackEventCommon,
204    pub user: JsonValue,
205}
206
207#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
208pub struct SlackChannelCreatedEventPayload {
209    #[serde(flatten)]
210    pub common: SlackEventCommon,
211    pub channel: JsonValue,
212}
213
214#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
215#[serde(untagged)]
216pub enum SlackEventPayload {
217    MessageChannels(SlackMessageChannelsEventPayload),
218    AppMention(SlackAppMentionEventPayload),
219    ReactionAdded(SlackReactionAddedEventPayload),
220    TeamJoin(SlackTeamJoinEventPayload),
221    ChannelCreated(SlackChannelCreatedEventPayload),
222    Other(SlackEventCommon),
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
226pub struct LinearEventPayload {
227    pub action: Option<String>,
228    pub organization_id: Option<String>,
229    pub webhook_timestamp: Option<String>,
230    pub raw: JsonValue,
231}
232
233#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
234pub struct NotionEventPayload {
235    pub event: String,
236    pub workspace_id: Option<String>,
237    pub request_id: Option<String>,
238    pub raw: JsonValue,
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct CronEventPayload {
243    pub cron_id: Option<String>,
244    pub schedule: Option<String>,
245    #[serde(with = "time::serde::rfc3339")]
246    pub tick_at: OffsetDateTime,
247    pub raw: JsonValue,
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct GenericWebhookPayload {
252    pub source: Option<String>,
253    pub content_type: Option<String>,
254    pub raw: JsonValue,
255}
256
257#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
258pub struct A2aPushPayload {
259    pub task_id: Option<String>,
260    pub sender: Option<String>,
261    pub raw: JsonValue,
262}
263
264#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
265pub struct ExtensionProviderPayload {
266    pub provider: String,
267    pub schema_name: String,
268    pub raw: JsonValue,
269}
270
271#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
272#[serde(untagged)]
273pub enum ProviderPayload {
274    Known(KnownProviderPayload),
275    Extension(ExtensionProviderPayload),
276}
277
278impl ProviderPayload {
279    pub fn provider(&self) -> &str {
280        match self {
281            Self::Known(known) => known.provider(),
282            Self::Extension(payload) => payload.provider.as_str(),
283        }
284    }
285
286    pub fn normalize(
287        provider: &ProviderId,
288        kind: &str,
289        headers: &BTreeMap<String, String>,
290        raw: JsonValue,
291    ) -> Result<Self, ProviderCatalogError> {
292        provider_catalog()
293            .read()
294            .expect("provider catalog poisoned")
295            .normalize(provider, kind, headers, raw)
296    }
297}
298
299#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
300#[serde(tag = "provider")]
301pub enum KnownProviderPayload {
302    #[serde(rename = "github")]
303    GitHub(GitHubEventPayload),
304    #[serde(rename = "slack")]
305    Slack(Box<SlackEventPayload>),
306    #[serde(rename = "linear")]
307    Linear(LinearEventPayload),
308    #[serde(rename = "notion")]
309    Notion(NotionEventPayload),
310    #[serde(rename = "cron")]
311    Cron(CronEventPayload),
312    #[serde(rename = "webhook")]
313    Webhook(GenericWebhookPayload),
314    #[serde(rename = "a2a-push")]
315    A2aPush(A2aPushPayload),
316}
317
318impl KnownProviderPayload {
319    pub fn provider(&self) -> &str {
320        match self {
321            Self::GitHub(_) => "github",
322            Self::Slack(_) => "slack",
323            Self::Linear(_) => "linear",
324            Self::Notion(_) => "notion",
325            Self::Cron(_) => "cron",
326            Self::Webhook(_) => "webhook",
327            Self::A2aPush(_) => "a2a-push",
328        }
329    }
330}
331
332#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
333pub struct TriggerEvent {
334    pub id: TriggerEventId,
335    pub provider: ProviderId,
336    pub kind: String,
337    #[serde(with = "time::serde::rfc3339")]
338    pub received_at: OffsetDateTime,
339    #[serde(with = "time::serde::rfc3339::option")]
340    pub occurred_at: Option<OffsetDateTime>,
341    pub dedupe_key: String,
342    pub trace_id: TraceId,
343    pub tenant_id: Option<TenantId>,
344    pub headers: BTreeMap<String, String>,
345    #[serde(default, skip_serializing_if = "Option::is_none")]
346    pub batch: Option<Vec<JsonValue>>,
347    pub provider_payload: ProviderPayload,
348    pub signature_status: SignatureStatus,
349    #[serde(skip)]
350    pub dedupe_claimed: bool,
351}
352
353impl TriggerEvent {
354    #[allow(clippy::too_many_arguments)]
355    pub fn new(
356        provider: ProviderId,
357        kind: impl Into<String>,
358        occurred_at: Option<OffsetDateTime>,
359        dedupe_key: impl Into<String>,
360        tenant_id: Option<TenantId>,
361        headers: BTreeMap<String, String>,
362        provider_payload: ProviderPayload,
363        signature_status: SignatureStatus,
364    ) -> Self {
365        Self {
366            id: TriggerEventId::new(),
367            provider,
368            kind: kind.into(),
369            received_at: clock::now_utc(),
370            occurred_at,
371            dedupe_key: dedupe_key.into(),
372            trace_id: TraceId::new(),
373            tenant_id,
374            headers,
375            batch: None,
376            provider_payload,
377            signature_status,
378            dedupe_claimed: false,
379        }
380    }
381
382    pub fn dedupe_claimed(&self) -> bool {
383        self.dedupe_claimed
384    }
385
386    pub fn mark_dedupe_claimed(&mut self) {
387        self.dedupe_claimed = true;
388    }
389}
390
391#[derive(Clone, Debug, PartialEq, Eq)]
392pub struct HeaderRedactionPolicy {
393    safe_exact_names: BTreeSet<String>,
394}
395
396impl HeaderRedactionPolicy {
397    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
398        self.safe_exact_names
399            .insert(name.into().to_ascii_lowercase());
400        self
401    }
402
403    fn should_keep(&self, name: &str) -> bool {
404        let lower = name.to_ascii_lowercase();
405        if self.safe_exact_names.contains(lower.as_str()) {
406            return true;
407        }
408        matches!(
409            lower.as_str(),
410            "user-agent"
411                | "request-id"
412                | "x-request-id"
413                | "x-correlation-id"
414                | "content-type"
415                | "content-length"
416                | "x-github-event"
417                | "x-github-delivery"
418                | "x-github-hook-id"
419                | "x-hub-signature-256"
420                | "x-slack-request-timestamp"
421                | "x-slack-signature"
422                | "x-linear-signature"
423                | "x-notion-signature"
424                | "x-a2a-signature"
425                | "x-a2a-delivery"
426        ) || lower.ends_with("-event")
427            || lower.ends_with("-delivery")
428            || lower.contains("timestamp")
429            || lower.contains("request-id")
430    }
431
432    fn should_redact(&self, name: &str) -> bool {
433        let lower = name.to_ascii_lowercase();
434        if self.should_keep(lower.as_str()) {
435            return false;
436        }
437        lower.contains("authorization")
438            || lower.contains("cookie")
439            || lower.contains("secret")
440            || lower.contains("token")
441            || lower.contains("key")
442    }
443}
444
445impl Default for HeaderRedactionPolicy {
446    fn default() -> Self {
447        Self {
448            safe_exact_names: BTreeSet::from([
449                "content-length".to_string(),
450                "content-type".to_string(),
451                "request-id".to_string(),
452                "user-agent".to_string(),
453                "x-a2a-delivery".to_string(),
454                "x-a2a-signature".to_string(),
455                "x-correlation-id".to_string(),
456                "x-github-delivery".to_string(),
457                "x-github-event".to_string(),
458                "x-github-hook-id".to_string(),
459                "x-hub-signature-256".to_string(),
460                "x-linear-signature".to_string(),
461                "x-notion-signature".to_string(),
462                "x-request-id".to_string(),
463                "x-slack-request-timestamp".to_string(),
464                "x-slack-signature".to_string(),
465            ]),
466        }
467    }
468}
469
470pub fn redact_headers(
471    headers: &BTreeMap<String, String>,
472    policy: &HeaderRedactionPolicy,
473) -> BTreeMap<String, String> {
474    headers
475        .iter()
476        .map(|(name, value)| {
477            if policy.should_redact(name) {
478                (name.clone(), REDACTED_HEADER_VALUE.to_string())
479            } else {
480                (name.clone(), value.clone())
481            }
482        })
483        .collect()
484}
485
486#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
487pub struct ProviderSecretRequirement {
488    pub name: String,
489    pub required: bool,
490    pub namespace: String,
491}
492
493#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
494pub struct ProviderOutboundMethod {
495    pub name: String,
496}
497
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
499#[serde(tag = "kind", rename_all = "snake_case")]
500pub enum SignatureVerificationMetadata {
501    #[default]
502    None,
503    Hmac {
504        variant: String,
505        raw_body: bool,
506        signature_header: String,
507        timestamp_header: Option<String>,
508        id_header: Option<String>,
509        default_tolerance_secs: Option<i64>,
510        digest: String,
511        encoding: String,
512    },
513}
514
515#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
516#[serde(tag = "kind", rename_all = "snake_case")]
517pub enum ProviderRuntimeMetadata {
518    Builtin {
519        connector: String,
520        default_signature_variant: Option<String>,
521    },
522    #[default]
523    Placeholder,
524}
525
526#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
527pub struct ProviderMetadata {
528    pub provider: String,
529    #[serde(default)]
530    pub kinds: Vec<String>,
531    pub schema_name: String,
532    #[serde(default)]
533    pub outbound_methods: Vec<ProviderOutboundMethod>,
534    #[serde(default)]
535    pub secret_requirements: Vec<ProviderSecretRequirement>,
536    #[serde(default)]
537    pub signature_verification: SignatureVerificationMetadata,
538    #[serde(default)]
539    pub runtime: ProviderRuntimeMetadata,
540}
541
542impl ProviderMetadata {
543    pub fn supports_kind(&self, kind: &str) -> bool {
544        self.kinds.iter().any(|candidate| candidate == kind)
545    }
546
547    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
548        self.secret_requirements
549            .iter()
550            .filter(|requirement| requirement.required)
551            .map(|requirement| requirement.name.as_str())
552    }
553}
554
555pub trait ProviderSchema: Send + Sync {
556    fn provider_id(&self) -> &'static str;
557    fn harn_schema_name(&self) -> &'static str;
558    fn metadata(&self) -> ProviderMetadata {
559        ProviderMetadata {
560            provider: self.provider_id().to_string(),
561            schema_name: self.harn_schema_name().to_string(),
562            ..ProviderMetadata::default()
563        }
564    }
565    fn normalize(
566        &self,
567        kind: &str,
568        headers: &BTreeMap<String, String>,
569        raw: JsonValue,
570    ) -> Result<ProviderPayload, ProviderCatalogError>;
571}
572
573#[derive(Clone, Debug, PartialEq, Eq)]
574pub enum ProviderCatalogError {
575    DuplicateProvider(String),
576    UnknownProvider(String),
577}
578
579impl std::fmt::Display for ProviderCatalogError {
580    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
581        match self {
582            Self::DuplicateProvider(provider) => {
583                write!(f, "provider `{provider}` is already registered")
584            }
585            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
586        }
587    }
588}
589
590impl std::error::Error for ProviderCatalogError {}
591
592#[derive(Clone, Default)]
593pub struct ProviderCatalog {
594    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
595}
596
597impl ProviderCatalog {
598    pub fn with_defaults() -> Self {
599        let mut catalog = Self::default();
600        for schema in default_provider_schemas() {
601            catalog
602                .register(schema)
603                .expect("default providers must register cleanly");
604        }
605        catalog
606    }
607
608    pub fn register(
609        &mut self,
610        schema: Arc<dyn ProviderSchema>,
611    ) -> Result<(), ProviderCatalogError> {
612        let provider = schema.provider_id().to_string();
613        if self.providers.contains_key(provider.as_str()) {
614            return Err(ProviderCatalogError::DuplicateProvider(provider));
615        }
616        self.providers.insert(provider, schema);
617        Ok(())
618    }
619
620    pub fn normalize(
621        &self,
622        provider: &ProviderId,
623        kind: &str,
624        headers: &BTreeMap<String, String>,
625        raw: JsonValue,
626    ) -> Result<ProviderPayload, ProviderCatalogError> {
627        let schema = self
628            .providers
629            .get(provider.as_str())
630            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
631        schema.normalize(kind, headers, raw)
632    }
633
634    pub fn schema_names(&self) -> BTreeMap<String, String> {
635        self.providers
636            .iter()
637            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
638            .collect()
639    }
640
641    pub fn entries(&self) -> Vec<ProviderMetadata> {
642        self.providers
643            .values()
644            .map(|schema| schema.metadata())
645            .collect()
646    }
647
648    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
649        self.providers.get(provider).map(|schema| schema.metadata())
650    }
651}
652
653pub fn register_provider_schema(
654    schema: Arc<dyn ProviderSchema>,
655) -> Result<(), ProviderCatalogError> {
656    provider_catalog()
657        .write()
658        .expect("provider catalog poisoned")
659        .register(schema)
660}
661
662pub fn reset_provider_catalog() {
663    *provider_catalog()
664        .write()
665        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
666}
667
668pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
669    provider_catalog()
670        .read()
671        .expect("provider catalog poisoned")
672        .schema_names()
673}
674
675pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
676    provider_catalog()
677        .read()
678        .expect("provider catalog poisoned")
679        .entries()
680}
681
682pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
683    provider_catalog()
684        .read()
685        .expect("provider catalog poisoned")
686        .metadata_for(provider)
687}
688
689fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
690    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
691    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
692}
693
694struct BuiltinProviderSchema {
695    provider_id: &'static str,
696    harn_schema_name: &'static str,
697    metadata: ProviderMetadata,
698    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
699}
700
701impl ProviderSchema for BuiltinProviderSchema {
702    fn provider_id(&self) -> &'static str {
703        self.provider_id
704    }
705
706    fn harn_schema_name(&self) -> &'static str {
707        self.harn_schema_name
708    }
709
710    fn metadata(&self) -> ProviderMetadata {
711        self.metadata.clone()
712    }
713
714    fn normalize(
715        &self,
716        kind: &str,
717        headers: &BTreeMap<String, String>,
718        raw: JsonValue,
719    ) -> Result<ProviderPayload, ProviderCatalogError> {
720        Ok((self.normalize)(kind, headers, raw))
721    }
722}
723
724fn provider_metadata_entry(
725    provider: &str,
726    kinds: &[&str],
727    schema_name: &str,
728    signature_verification: SignatureVerificationMetadata,
729    secret_requirements: Vec<ProviderSecretRequirement>,
730    runtime: ProviderRuntimeMetadata,
731) -> ProviderMetadata {
732    ProviderMetadata {
733        provider: provider.to_string(),
734        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
735        schema_name: schema_name.to_string(),
736        outbound_methods: Vec::new(),
737        secret_requirements,
738        signature_verification,
739        runtime,
740    }
741}
742
743fn hmac_signature_metadata(
744    variant: &str,
745    signature_header: &str,
746    timestamp_header: Option<&str>,
747    id_header: Option<&str>,
748    default_tolerance_secs: Option<i64>,
749    encoding: &str,
750) -> SignatureVerificationMetadata {
751    SignatureVerificationMetadata::Hmac {
752        variant: variant.to_string(),
753        raw_body: true,
754        signature_header: signature_header.to_string(),
755        timestamp_header: timestamp_header.map(ToString::to_string),
756        id_header: id_header.map(ToString::to_string),
757        default_tolerance_secs,
758        digest: "sha256".to_string(),
759        encoding: encoding.to_string(),
760    }
761}
762
763fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
764    ProviderSecretRequirement {
765        name: name.to_string(),
766        required: true,
767        namespace: namespace.to_string(),
768    }
769}
770
771fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
772    vec![
773        Arc::new(BuiltinProviderSchema {
774            provider_id: "github",
775            harn_schema_name: "GitHubEventPayload",
776            metadata: provider_metadata_entry(
777                "github",
778                &["webhook"],
779                "GitHubEventPayload",
780                hmac_signature_metadata(
781                    "github",
782                    "X-Hub-Signature-256",
783                    None,
784                    Some("X-GitHub-Delivery"),
785                    None,
786                    "hex",
787                ),
788                vec![required_secret("signing_secret", "github")],
789                ProviderRuntimeMetadata::Builtin {
790                    connector: "webhook".to_string(),
791                    default_signature_variant: Some("github".to_string()),
792                },
793            ),
794            normalize: github_payload,
795        }),
796        Arc::new(BuiltinProviderSchema {
797            provider_id: "slack",
798            harn_schema_name: "SlackEventPayload",
799            metadata: provider_metadata_entry(
800                "slack",
801                &["webhook"],
802                "SlackEventPayload",
803                hmac_signature_metadata(
804                    "slack",
805                    "X-Slack-Signature",
806                    Some("X-Slack-Request-Timestamp"),
807                    None,
808                    Some(300),
809                    "hex",
810                ),
811                vec![required_secret("signing_secret", "slack")],
812                ProviderRuntimeMetadata::Builtin {
813                    connector: "slack".to_string(),
814                    default_signature_variant: Some("slack".to_string()),
815                },
816            ),
817            normalize: slack_payload,
818        }),
819        Arc::new(BuiltinProviderSchema {
820            provider_id: "linear",
821            harn_schema_name: "LinearEventPayload",
822            metadata: provider_metadata_entry(
823                "linear",
824                &["webhook"],
825                "LinearEventPayload",
826                SignatureVerificationMetadata::None,
827                Vec::new(),
828                ProviderRuntimeMetadata::Placeholder,
829            ),
830            normalize: linear_payload,
831        }),
832        Arc::new(BuiltinProviderSchema {
833            provider_id: "notion",
834            harn_schema_name: "NotionEventPayload",
835            metadata: provider_metadata_entry(
836                "notion",
837                &["webhook"],
838                "NotionEventPayload",
839                SignatureVerificationMetadata::None,
840                Vec::new(),
841                ProviderRuntimeMetadata::Placeholder,
842            ),
843            normalize: notion_payload,
844        }),
845        Arc::new(BuiltinProviderSchema {
846            provider_id: "cron",
847            harn_schema_name: "CronEventPayload",
848            metadata: provider_metadata_entry(
849                "cron",
850                &["cron"],
851                "CronEventPayload",
852                SignatureVerificationMetadata::None,
853                Vec::new(),
854                ProviderRuntimeMetadata::Builtin {
855                    connector: "cron".to_string(),
856                    default_signature_variant: None,
857                },
858            ),
859            normalize: cron_payload,
860        }),
861        Arc::new(BuiltinProviderSchema {
862            provider_id: "webhook",
863            harn_schema_name: "GenericWebhookPayload",
864            metadata: provider_metadata_entry(
865                "webhook",
866                &["webhook"],
867                "GenericWebhookPayload",
868                hmac_signature_metadata(
869                    "standard",
870                    "webhook-signature",
871                    Some("webhook-timestamp"),
872                    Some("webhook-id"),
873                    Some(300),
874                    "base64",
875                ),
876                vec![required_secret("signing_secret", "webhook")],
877                ProviderRuntimeMetadata::Builtin {
878                    connector: "webhook".to_string(),
879                    default_signature_variant: Some("standard".to_string()),
880                },
881            ),
882            normalize: webhook_payload,
883        }),
884        Arc::new(BuiltinProviderSchema {
885            provider_id: "a2a-push",
886            harn_schema_name: "A2aPushPayload",
887            metadata: provider_metadata_entry(
888                "a2a-push",
889                &["a2a-push"],
890                "A2aPushPayload",
891                SignatureVerificationMetadata::None,
892                Vec::new(),
893                ProviderRuntimeMetadata::Placeholder,
894            ),
895            normalize: a2a_push_payload,
896        }),
897    ]
898}
899
900fn github_payload(
901    kind: &str,
902    headers: &BTreeMap<String, String>,
903    raw: JsonValue,
904) -> ProviderPayload {
905    let common = GitHubEventCommon {
906        event: kind.to_string(),
907        action: raw
908            .get("action")
909            .and_then(JsonValue::as_str)
910            .map(ToString::to_string),
911        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
912        installation_id: raw
913            .get("installation")
914            .and_then(|value| value.get("id"))
915            .and_then(JsonValue::as_i64),
916        raw: raw.clone(),
917    };
918    let payload = match kind {
919        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
920            common,
921            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
922        }),
923        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
924            common,
925            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
926        }),
927        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
928            common,
929            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
930            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
931        }),
932        "pull_request_review" => {
933            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
934                common,
935                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
936                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
937            })
938        }
939        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
940            common,
941            commits: raw
942                .get("commits")
943                .and_then(JsonValue::as_array)
944                .cloned()
945                .unwrap_or_default(),
946            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
947        }),
948        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
949            common,
950            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
951        }),
952        _ => GitHubEventPayload::Other(common),
953    };
954    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
955}
956
957fn slack_payload(
958    kind: &str,
959    _headers: &BTreeMap<String, String>,
960    raw: JsonValue,
961) -> ProviderPayload {
962    let event = raw.get("event");
963    let common = SlackEventCommon {
964        event: kind.to_string(),
965        event_id: raw
966            .get("event_id")
967            .and_then(JsonValue::as_str)
968            .map(ToString::to_string),
969        api_app_id: raw
970            .get("api_app_id")
971            .and_then(JsonValue::as_str)
972            .map(ToString::to_string),
973        team_id: raw
974            .get("team_id")
975            .and_then(JsonValue::as_str)
976            .map(ToString::to_string),
977        channel_id: slack_channel_id(event),
978        user_id: slack_user_id(event),
979        event_ts: event
980            .and_then(|value| value.get("event_ts"))
981            .and_then(JsonValue::as_str)
982            .map(ToString::to_string),
983        raw: raw.clone(),
984    };
985    let payload = match kind {
986        "message.channels" => {
987            SlackEventPayload::MessageChannels(SlackMessageChannelsEventPayload {
988                subtype: event
989                    .and_then(|value| value.get("subtype"))
990                    .and_then(JsonValue::as_str)
991                    .map(ToString::to_string),
992                channel: event
993                    .and_then(|value| value.get("channel"))
994                    .and_then(JsonValue::as_str)
995                    .map(ToString::to_string),
996                user: event
997                    .and_then(|value| value.get("user"))
998                    .and_then(JsonValue::as_str)
999                    .map(ToString::to_string),
1000                text: event
1001                    .and_then(|value| value.get("text"))
1002                    .and_then(JsonValue::as_str)
1003                    .map(ToString::to_string),
1004                ts: event
1005                    .and_then(|value| value.get("ts"))
1006                    .and_then(JsonValue::as_str)
1007                    .map(ToString::to_string),
1008                thread_ts: event
1009                    .and_then(|value| value.get("thread_ts"))
1010                    .and_then(JsonValue::as_str)
1011                    .map(ToString::to_string),
1012                common,
1013            })
1014        }
1015        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1016            channel: event
1017                .and_then(|value| value.get("channel"))
1018                .and_then(JsonValue::as_str)
1019                .map(ToString::to_string),
1020            user: event
1021                .and_then(|value| value.get("user"))
1022                .and_then(JsonValue::as_str)
1023                .map(ToString::to_string),
1024            text: event
1025                .and_then(|value| value.get("text"))
1026                .and_then(JsonValue::as_str)
1027                .map(ToString::to_string),
1028            ts: event
1029                .and_then(|value| value.get("ts"))
1030                .and_then(JsonValue::as_str)
1031                .map(ToString::to_string),
1032            thread_ts: event
1033                .and_then(|value| value.get("thread_ts"))
1034                .and_then(JsonValue::as_str)
1035                .map(ToString::to_string),
1036            common,
1037        }),
1038        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1039            reaction: event
1040                .and_then(|value| value.get("reaction"))
1041                .and_then(JsonValue::as_str)
1042                .map(ToString::to_string),
1043            item_user: event
1044                .and_then(|value| value.get("item_user"))
1045                .and_then(JsonValue::as_str)
1046                .map(ToString::to_string),
1047            item: event
1048                .and_then(|value| value.get("item"))
1049                .cloned()
1050                .unwrap_or(JsonValue::Null),
1051            common,
1052        }),
1053        "team_join" => SlackEventPayload::TeamJoin(SlackTeamJoinEventPayload {
1054            user: event
1055                .and_then(|value| value.get("user"))
1056                .cloned()
1057                .unwrap_or(JsonValue::Null),
1058            common,
1059        }),
1060        "channel_created" => SlackEventPayload::ChannelCreated(SlackChannelCreatedEventPayload {
1061            channel: event
1062                .and_then(|value| value.get("channel"))
1063                .cloned()
1064                .unwrap_or(JsonValue::Null),
1065            common,
1066        }),
1067        _ => SlackEventPayload::Other(common),
1068    };
1069    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1070}
1071
1072fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1073    event
1074        .and_then(|value| value.get("channel"))
1075        .and_then(JsonValue::as_str)
1076        .map(ToString::to_string)
1077        .or_else(|| {
1078            event
1079                .and_then(|value| value.get("item"))
1080                .and_then(|value| value.get("channel"))
1081                .and_then(JsonValue::as_str)
1082                .map(ToString::to_string)
1083        })
1084        .or_else(|| {
1085            event
1086                .and_then(|value| value.get("channel"))
1087                .and_then(|value| value.get("id"))
1088                .and_then(JsonValue::as_str)
1089                .map(ToString::to_string)
1090        })
1091}
1092
1093fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1094    event
1095        .and_then(|value| value.get("user"))
1096        .and_then(JsonValue::as_str)
1097        .map(ToString::to_string)
1098        .or_else(|| {
1099            event
1100                .and_then(|value| value.get("user"))
1101                .and_then(|value| value.get("id"))
1102                .and_then(JsonValue::as_str)
1103                .map(ToString::to_string)
1104        })
1105}
1106
1107fn linear_payload(
1108    _kind: &str,
1109    headers: &BTreeMap<String, String>,
1110    raw: JsonValue,
1111) -> ProviderPayload {
1112    let action = raw
1113        .get("action")
1114        .and_then(JsonValue::as_str)
1115        .map(ToString::to_string);
1116    let organization_id = raw
1117        .get("organizationId")
1118        .and_then(JsonValue::as_str)
1119        .map(ToString::to_string);
1120    ProviderPayload::Known(KnownProviderPayload::Linear(LinearEventPayload {
1121        action,
1122        organization_id,
1123        webhook_timestamp: headers.get("Linear-Request-Timestamp").cloned(),
1124        raw,
1125    }))
1126}
1127
1128fn notion_payload(
1129    kind: &str,
1130    headers: &BTreeMap<String, String>,
1131    raw: JsonValue,
1132) -> ProviderPayload {
1133    let workspace_id = raw
1134        .get("workspace_id")
1135        .and_then(JsonValue::as_str)
1136        .map(ToString::to_string);
1137    ProviderPayload::Known(KnownProviderPayload::Notion(NotionEventPayload {
1138        event: kind.to_string(),
1139        workspace_id,
1140        request_id: headers
1141            .get("request-id")
1142            .cloned()
1143            .or_else(|| headers.get("x-request-id").cloned()),
1144        raw,
1145    }))
1146}
1147
1148fn cron_payload(
1149    _kind: &str,
1150    _headers: &BTreeMap<String, String>,
1151    raw: JsonValue,
1152) -> ProviderPayload {
1153    let cron_id = raw
1154        .get("cron_id")
1155        .and_then(JsonValue::as_str)
1156        .map(ToString::to_string);
1157    let schedule = raw
1158        .get("schedule")
1159        .and_then(JsonValue::as_str)
1160        .map(ToString::to_string);
1161    let tick_at = raw
1162        .get("tick_at")
1163        .and_then(JsonValue::as_str)
1164        .and_then(parse_rfc3339)
1165        .unwrap_or_else(OffsetDateTime::now_utc);
1166    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1167        cron_id,
1168        schedule,
1169        tick_at,
1170        raw,
1171    }))
1172}
1173
1174fn webhook_payload(
1175    _kind: &str,
1176    headers: &BTreeMap<String, String>,
1177    raw: JsonValue,
1178) -> ProviderPayload {
1179    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1180        source: headers.get("X-Webhook-Source").cloned(),
1181        content_type: headers.get("Content-Type").cloned(),
1182        raw,
1183    }))
1184}
1185
1186fn a2a_push_payload(
1187    _kind: &str,
1188    _headers: &BTreeMap<String, String>,
1189    raw: JsonValue,
1190) -> ProviderPayload {
1191    let task_id = raw
1192        .get("task_id")
1193        .and_then(JsonValue::as_str)
1194        .map(ToString::to_string);
1195    let sender = raw
1196        .get("sender")
1197        .and_then(JsonValue::as_str)
1198        .map(ToString::to_string);
1199    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1200        task_id,
1201        sender,
1202        raw,
1203    }))
1204}
1205
1206fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1207    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::*;
1213
1214    fn sample_headers() -> BTreeMap<String, String> {
1215        BTreeMap::from([
1216            ("Authorization".to_string(), "Bearer secret".to_string()),
1217            ("Cookie".to_string(), "session=abc".to_string()),
1218            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
1219            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1220            ("X-GitHub-Event".to_string(), "issues".to_string()),
1221            ("X-Webhook-Token".to_string(), "token".to_string()),
1222        ])
1223    }
1224
1225    #[test]
1226    fn default_redaction_policy_keeps_safe_headers() {
1227        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1228        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
1229        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
1230        assert_eq!(
1231            redacted.get("Authorization").unwrap(),
1232            REDACTED_HEADER_VALUE
1233        );
1234        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
1235        assert_eq!(
1236            redacted.get("X-Webhook-Token").unwrap(),
1237            REDACTED_HEADER_VALUE
1238        );
1239    }
1240
1241    #[test]
1242    fn provider_catalog_rejects_duplicates() {
1243        let mut catalog = ProviderCatalog::default();
1244        catalog
1245            .register(Arc::new(BuiltinProviderSchema {
1246                provider_id: "github",
1247                harn_schema_name: "GitHubEventPayload",
1248                metadata: provider_metadata_entry(
1249                    "github",
1250                    &["webhook"],
1251                    "GitHubEventPayload",
1252                    SignatureVerificationMetadata::None,
1253                    Vec::new(),
1254                    ProviderRuntimeMetadata::Placeholder,
1255                ),
1256                normalize: github_payload,
1257            }))
1258            .unwrap();
1259        let error = catalog
1260            .register(Arc::new(BuiltinProviderSchema {
1261                provider_id: "github",
1262                harn_schema_name: "GitHubEventPayload",
1263                metadata: provider_metadata_entry(
1264                    "github",
1265                    &["webhook"],
1266                    "GitHubEventPayload",
1267                    SignatureVerificationMetadata::None,
1268                    Vec::new(),
1269                    ProviderRuntimeMetadata::Placeholder,
1270                ),
1271                normalize: github_payload,
1272            }))
1273            .unwrap_err();
1274        assert_eq!(
1275            error,
1276            ProviderCatalogError::DuplicateProvider("github".to_string())
1277        );
1278    }
1279
1280    #[test]
1281    fn registered_provider_metadata_marks_builtin_connectors() {
1282        let entries = registered_provider_metadata();
1283        let builtin: Vec<&ProviderMetadata> = entries
1284            .iter()
1285            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
1286            .collect();
1287
1288        assert_eq!(builtin.len(), 4);
1289        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
1290        assert!(builtin.iter().any(|entry| entry.provider == "github"));
1291        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
1292        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
1293    }
1294
1295    #[test]
1296    fn trigger_event_round_trip_is_stable() {
1297        let provider = ProviderId::from("github");
1298        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1299        let payload = ProviderPayload::normalize(
1300            &provider,
1301            "issues",
1302            &sample_headers(),
1303            serde_json::json!({
1304                "action": "opened",
1305                "installation": {"id": 42},
1306                "issue": {"number": 99}
1307            }),
1308        )
1309        .unwrap();
1310        let event = TriggerEvent {
1311            id: TriggerEventId("trigger_evt_fixed".to_string()),
1312            provider,
1313            kind: "issues".to_string(),
1314            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
1315            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
1316            dedupe_key: "delivery-123".to_string(),
1317            trace_id: TraceId("trace_fixed".to_string()),
1318            tenant_id: Some(TenantId("tenant_1".to_string())),
1319            headers,
1320            provider_payload: payload,
1321            signature_status: SignatureStatus::Verified,
1322            dedupe_claimed: false,
1323            batch: None,
1324        };
1325
1326        let once = serde_json::to_value(&event).unwrap();
1327        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
1328        let twice = serde_json::to_value(&decoded).unwrap();
1329        assert_eq!(decoded, event);
1330        assert_eq!(once, twice);
1331    }
1332
1333    #[test]
1334    fn unknown_provider_errors() {
1335        let error = ProviderPayload::normalize(
1336            &ProviderId::from("custom-provider"),
1337            "thing.happened",
1338            &BTreeMap::new(),
1339            serde_json::json!({"ok": true}),
1340        )
1341        .unwrap_err();
1342        assert_eq!(
1343            error,
1344            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
1345        );
1346    }
1347}