Skip to main content

harn_vm/triggers/event/
catalog.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6
7use super::core::ProviderId;
8use super::normalize::{
9    a2a_push_payload, cron_payload, email_payload, github_payload, kafka_payload, linear_payload,
10    nats_payload, notion_payload, postgres_cdc_payload, pulsar_payload, slack_payload,
11    webhook_payload, websocket_payload,
12};
13use super::payloads::ProviderPayload;
14
15impl ProviderPayload {
16    pub fn normalize(
17        provider: &ProviderId,
18        kind: &str,
19        headers: &BTreeMap<String, String>,
20        raw: JsonValue,
21    ) -> Result<Self, ProviderCatalogError> {
22        provider_catalog()
23            .read()
24            .expect("provider catalog poisoned")
25            .normalize(provider, kind, headers, raw)
26    }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ProviderSecretRequirement {
31    pub name: String,
32    pub required: bool,
33    pub namespace: String,
34}
35
36#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
37pub struct ProviderOutboundMethod {
38    pub name: String,
39}
40
41#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
42#[serde(tag = "kind", rename_all = "snake_case")]
43pub enum SignatureVerificationMetadata {
44    #[default]
45    None,
46    Hmac {
47        variant: String,
48        raw_body: bool,
49        signature_header: String,
50        timestamp_header: Option<String>,
51        id_header: Option<String>,
52        default_tolerance_secs: Option<i64>,
53        digest: String,
54        encoding: String,
55    },
56}
57
58#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
59#[serde(tag = "kind", rename_all = "snake_case")]
60pub enum ProviderRuntimeMetadata {
61    Builtin {
62        connector: String,
63        default_signature_variant: Option<String>,
64    },
65    #[default]
66    Placeholder,
67}
68
69#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
70pub struct ProviderMetadata {
71    pub provider: String,
72    #[serde(default)]
73    pub kinds: Vec<String>,
74    pub schema_name: String,
75    #[serde(default)]
76    pub outbound_methods: Vec<ProviderOutboundMethod>,
77    #[serde(default)]
78    pub secret_requirements: Vec<ProviderSecretRequirement>,
79    #[serde(default)]
80    pub signature_verification: SignatureVerificationMetadata,
81    #[serde(default)]
82    pub runtime: ProviderRuntimeMetadata,
83}
84
85impl ProviderMetadata {
86    pub fn supports_kind(&self, kind: &str) -> bool {
87        self.kinds.iter().any(|candidate| candidate == kind)
88    }
89
90    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
91        self.secret_requirements
92            .iter()
93            .filter(|requirement| requirement.required)
94            .map(|requirement| requirement.name.as_str())
95    }
96}
97
98pub trait ProviderSchema: Send + Sync {
99    fn provider_id(&self) -> &str;
100    fn harn_schema_name(&self) -> &str;
101    fn metadata(&self) -> ProviderMetadata {
102        ProviderMetadata {
103            provider: self.provider_id().to_string(),
104            schema_name: self.harn_schema_name().to_string(),
105            ..ProviderMetadata::default()
106        }
107    }
108    fn normalize(
109        &self,
110        kind: &str,
111        headers: &BTreeMap<String, String>,
112        raw: JsonValue,
113    ) -> Result<ProviderPayload, ProviderCatalogError>;
114}
115
116#[derive(Clone, Debug, PartialEq, Eq)]
117pub enum ProviderCatalogError {
118    DuplicateProvider(String),
119    UnknownProvider(String),
120}
121
122impl std::fmt::Display for ProviderCatalogError {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        match self {
125            Self::DuplicateProvider(provider) => {
126                write!(f, "provider `{provider}` is already registered")
127            }
128            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
129        }
130    }
131}
132
133impl std::error::Error for ProviderCatalogError {}
134
135#[derive(Clone, Default)]
136pub struct ProviderCatalog {
137    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
138}
139
140impl ProviderCatalog {
141    pub fn with_defaults() -> Self {
142        let mut catalog = Self::default();
143        for schema in default_provider_schemas() {
144            catalog
145                .register(schema)
146                .expect("default providers must register cleanly");
147        }
148        catalog
149    }
150
151    pub fn with_defaults_and(
152        schemas: Vec<Arc<dyn ProviderSchema>>,
153    ) -> Result<Self, ProviderCatalogError> {
154        let mut catalog = Self::with_defaults();
155        let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
156        for schema in schemas {
157            if builtin_providers.contains(schema.provider_id()) {
158                continue;
159            }
160            catalog.register(schema)?;
161        }
162        Ok(catalog)
163    }
164
165    pub fn register(
166        &mut self,
167        schema: Arc<dyn ProviderSchema>,
168    ) -> Result<(), ProviderCatalogError> {
169        let provider = schema.provider_id().to_string();
170        if self.providers.contains_key(provider.as_str()) {
171            return Err(ProviderCatalogError::DuplicateProvider(provider));
172        }
173        self.providers.insert(provider, schema);
174        Ok(())
175    }
176
177    pub fn normalize(
178        &self,
179        provider: &ProviderId,
180        kind: &str,
181        headers: &BTreeMap<String, String>,
182        raw: JsonValue,
183    ) -> Result<ProviderPayload, ProviderCatalogError> {
184        let schema = self
185            .providers
186            .get(provider.as_str())
187            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
188        schema.normalize(kind, headers, raw)
189    }
190
191    pub fn schema_names(&self) -> BTreeMap<String, String> {
192        self.providers
193            .iter()
194            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
195            .collect()
196    }
197
198    pub fn entries(&self) -> Vec<ProviderMetadata> {
199        self.providers
200            .values()
201            .map(|schema| schema.metadata())
202            .collect()
203    }
204
205    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
206        self.providers.get(provider).map(|schema| schema.metadata())
207    }
208}
209
210pub fn register_provider_schema(
211    schema: Arc<dyn ProviderSchema>,
212) -> Result<(), ProviderCatalogError> {
213    provider_catalog()
214        .write()
215        .expect("provider catalog poisoned")
216        .register(schema)
217}
218
219pub fn reset_provider_catalog() {
220    *provider_catalog()
221        .write()
222        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
223}
224
225pub fn reset_provider_catalog_with(
226    schemas: Vec<Arc<dyn ProviderSchema>>,
227) -> Result<(), ProviderCatalogError> {
228    let catalog = ProviderCatalog::with_defaults_and(schemas)?;
229    install_provider_catalog(catalog);
230    Ok(())
231}
232
233pub fn install_provider_catalog(catalog: ProviderCatalog) {
234    *provider_catalog()
235        .write()
236        .expect("provider catalog poisoned") = catalog;
237}
238
239pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
240    provider_catalog()
241        .read()
242        .expect("provider catalog poisoned")
243        .schema_names()
244}
245
246pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
247    provider_catalog()
248        .read()
249        .expect("provider catalog poisoned")
250        .entries()
251}
252
253pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
254    provider_catalog()
255        .read()
256        .expect("provider catalog poisoned")
257        .metadata_for(provider)
258}
259
260fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
261    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
262    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
263}
264
265struct BuiltinProviderSchema {
266    provider_id: &'static str,
267    harn_schema_name: &'static str,
268    metadata: ProviderMetadata,
269    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
270}
271
272impl ProviderSchema for BuiltinProviderSchema {
273    fn provider_id(&self) -> &str {
274        self.provider_id
275    }
276
277    fn harn_schema_name(&self) -> &str {
278        self.harn_schema_name
279    }
280
281    fn metadata(&self) -> ProviderMetadata {
282        self.metadata.clone()
283    }
284
285    fn normalize(
286        &self,
287        kind: &str,
288        headers: &BTreeMap<String, String>,
289        raw: JsonValue,
290    ) -> Result<ProviderPayload, ProviderCatalogError> {
291        Ok((self.normalize)(kind, headers, raw))
292    }
293}
294
295fn provider_metadata_entry(
296    provider: &str,
297    kinds: &[&str],
298    schema_name: &str,
299    outbound_methods: &[&str],
300    signature_verification: SignatureVerificationMetadata,
301    secret_requirements: Vec<ProviderSecretRequirement>,
302    runtime: ProviderRuntimeMetadata,
303) -> ProviderMetadata {
304    ProviderMetadata {
305        provider: provider.to_string(),
306        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
307        schema_name: schema_name.to_string(),
308        outbound_methods: outbound_methods
309            .iter()
310            .map(|name| ProviderOutboundMethod {
311                name: (*name).to_string(),
312            })
313            .collect(),
314        secret_requirements,
315        signature_verification,
316        runtime,
317    }
318}
319
320fn hmac_signature_metadata(
321    variant: &str,
322    signature_header: &str,
323    timestamp_header: Option<&str>,
324    id_header: Option<&str>,
325    default_tolerance_secs: Option<i64>,
326    encoding: &str,
327) -> SignatureVerificationMetadata {
328    SignatureVerificationMetadata::Hmac {
329        variant: variant.to_string(),
330        raw_body: true,
331        signature_header: signature_header.to_string(),
332        timestamp_header: timestamp_header.map(ToString::to_string),
333        id_header: id_header.map(ToString::to_string),
334        default_tolerance_secs,
335        digest: "sha256".to_string(),
336        encoding: encoding.to_string(),
337    }
338}
339
340fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
341    ProviderSecretRequirement {
342        name: name.to_string(),
343        required: true,
344        namespace: namespace.to_string(),
345    }
346}
347
348fn outbound_method(name: &str) -> ProviderOutboundMethod {
349    ProviderOutboundMethod {
350        name: name.to_string(),
351    }
352}
353
354fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
355    ProviderSecretRequirement {
356        name: name.to_string(),
357        required: false,
358        namespace: namespace.to_string(),
359    }
360}
361
362fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
363    vec![
364        Arc::new(BuiltinProviderSchema {
365            provider_id: "github",
366            harn_schema_name: "GitHubEventPayload",
367            metadata: provider_metadata_entry(
368                "github",
369                &["webhook"],
370                "GitHubEventPayload",
371                &[
372                    "github.pr.list",
373                    "github.pr.view",
374                    "github.pr.checks",
375                    "github.pr.merge",
376                    "github.pr.enable_auto_merge",
377                    "github.pr.comment",
378                    "github.actions.workflow_dispatch",
379                    "github.actions.runs",
380                    "github.actions.run",
381                    "github.actions.logs",
382                    "github.release.latest",
383                    "github.release.assets",
384                    "github.merge_queue.entries",
385                    "github.merge_queue.enqueue",
386                    "github.issue.create",
387                    "github.issue.comment",
388                    "github.branch.protection",
389                    "api_call",
390                    "issues.create_comment",
391                    "issues.create",
392                    "issues.create_with_template",
393                    "issues.update",
394                    "issues.add_labels",
395                    "pulls.list",
396                    "pulls.list_with_checks",
397                    "pulls.get",
398                    "pulls.merge",
399                    "pulls.merge_safe",
400                    "pulls.create_review_comment",
401                    "pulls.get_diff",
402                    "pulls.list_files",
403                    "pulls.list_reviews",
404                    "repos.get_content",
405                    "repos.get_text",
406                    "repos.get_latest_release",
407                    "repos.list_release_assets",
408                    "repos.get_branch_protection",
409                    "git.delete_ref",
410                    "actions.workflow_dispatch",
411                    "actions.workflow_runs.list",
412                    "actions.workflow_run.get",
413                    "check_runs.create",
414                    "check_runs.update",
415                    "graphql",
416                ],
417                hmac_signature_metadata(
418                    "github",
419                    "X-Hub-Signature-256",
420                    None,
421                    Some("X-GitHub-Delivery"),
422                    None,
423                    "hex",
424                ),
425                vec![required_secret("signing_secret", "github")],
426                ProviderRuntimeMetadata::Placeholder,
427            ),
428            normalize: github_payload,
429        }),
430        Arc::new(BuiltinProviderSchema {
431            provider_id: "slack",
432            harn_schema_name: "SlackEventPayload",
433            metadata: provider_metadata_entry(
434                "slack",
435                &["webhook"],
436                "SlackEventPayload",
437                &[
438                    "post_message",
439                    "update_message",
440                    "add_reaction",
441                    "open_view",
442                    "user_info",
443                    "api_call",
444                    "upload_file",
445                ],
446                hmac_signature_metadata(
447                    "slack",
448                    "X-Slack-Signature",
449                    Some("X-Slack-Request-Timestamp"),
450                    None,
451                    Some(300),
452                    "hex",
453                ),
454                vec![required_secret("signing_secret", "slack")],
455                ProviderRuntimeMetadata::Placeholder,
456            ),
457            normalize: slack_payload,
458        }),
459        Arc::new(BuiltinProviderSchema {
460            provider_id: "linear",
461            harn_schema_name: "LinearEventPayload",
462            metadata: {
463                let mut metadata = provider_metadata_entry(
464                    "linear",
465                    &["webhook"],
466                    "LinearEventPayload",
467                    &[],
468                    hmac_signature_metadata(
469                        "linear",
470                        "Linear-Signature",
471                        None,
472                        Some("Linear-Delivery"),
473                        Some(75),
474                        "hex",
475                    ),
476                    vec![
477                        required_secret("signing_secret", "linear"),
478                        optional_secret("access_token", "linear"),
479                    ],
480                    ProviderRuntimeMetadata::Placeholder,
481                );
482                metadata.outbound_methods = vec![
483                    outbound_method("list_issues"),
484                    outbound_method("update_issue"),
485                    outbound_method("create_comment"),
486                    outbound_method("search"),
487                    outbound_method("graphql"),
488                ];
489                metadata
490            },
491            normalize: linear_payload,
492        }),
493        Arc::new(BuiltinProviderSchema {
494            provider_id: "notion",
495            harn_schema_name: "NotionEventPayload",
496            metadata: {
497                let mut metadata = provider_metadata_entry(
498                    "notion",
499                    &["webhook", "poll"],
500                    "NotionEventPayload",
501                    &[],
502                    hmac_signature_metadata(
503                        "notion",
504                        "X-Notion-Signature",
505                        None,
506                        None,
507                        None,
508                        "hex",
509                    ),
510                    vec![required_secret("verification_token", "notion")],
511                    ProviderRuntimeMetadata::Placeholder,
512                );
513                metadata.outbound_methods = vec![
514                    outbound_method("get_page"),
515                    outbound_method("update_page"),
516                    outbound_method("append_blocks"),
517                    outbound_method("query_database"),
518                    outbound_method("search"),
519                    outbound_method("create_comment"),
520                    outbound_method("api_call"),
521                ];
522                metadata
523            },
524            normalize: notion_payload,
525        }),
526        Arc::new(BuiltinProviderSchema {
527            provider_id: "cron",
528            harn_schema_name: "CronEventPayload",
529            metadata: provider_metadata_entry(
530                "cron",
531                &["cron"],
532                "CronEventPayload",
533                &[],
534                SignatureVerificationMetadata::None,
535                Vec::new(),
536                ProviderRuntimeMetadata::Builtin {
537                    connector: "cron".to_string(),
538                    default_signature_variant: None,
539                },
540            ),
541            normalize: cron_payload,
542        }),
543        Arc::new(BuiltinProviderSchema {
544            provider_id: "webhook",
545            harn_schema_name: "GenericWebhookPayload",
546            metadata: provider_metadata_entry(
547                "webhook",
548                &["webhook"],
549                "GenericWebhookPayload",
550                &[],
551                hmac_signature_metadata(
552                    "standard",
553                    "webhook-signature",
554                    Some("webhook-timestamp"),
555                    Some("webhook-id"),
556                    Some(300),
557                    "base64",
558                ),
559                vec![required_secret("signing_secret", "webhook")],
560                ProviderRuntimeMetadata::Builtin {
561                    connector: "webhook".to_string(),
562                    default_signature_variant: Some("standard".to_string()),
563                },
564            ),
565            normalize: webhook_payload,
566        }),
567        Arc::new(BuiltinProviderSchema {
568            provider_id: "a2a-push",
569            harn_schema_name: "A2aPushPayload",
570            metadata: provider_metadata_entry(
571                "a2a-push",
572                &["a2a-push"],
573                "A2aPushPayload",
574                &[],
575                SignatureVerificationMetadata::None,
576                Vec::new(),
577                ProviderRuntimeMetadata::Builtin {
578                    connector: "a2a-push".to_string(),
579                    default_signature_variant: None,
580                },
581            ),
582            normalize: a2a_push_payload,
583        }),
584        Arc::new(stream_provider_schema("kafka", kafka_payload)),
585        Arc::new(stream_provider_schema("nats", nats_payload)),
586        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
587        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
588        Arc::new(stream_provider_schema("email", email_payload)),
589        Arc::new(stream_provider_schema("websocket", websocket_payload)),
590    ]
591}
592
593fn stream_provider_schema(
594    provider_id: &'static str,
595    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
596) -> BuiltinProviderSchema {
597    BuiltinProviderSchema {
598        provider_id,
599        harn_schema_name: "StreamEventPayload",
600        metadata: provider_metadata_entry(
601            provider_id,
602            &["stream"],
603            "StreamEventPayload",
604            &[],
605            SignatureVerificationMetadata::None,
606            Vec::new(),
607            ProviderRuntimeMetadata::Builtin {
608                connector: "stream".to_string(),
609                default_signature_variant: None,
610            },
611        ),
612        normalize,
613    }
614}