1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6
7use super::core::ProviderId;
8use super::normalize::{
9 a2a_push_payload, cron_payload, email_payload, github_payload, kafka_payload, linear_payload,
10 nats_payload, notion_payload, postgres_cdc_payload, pulsar_payload, slack_payload,
11 webhook_payload, websocket_payload,
12};
13use super::payloads::ProviderPayload;
14
15impl ProviderPayload {
16 pub fn normalize(
17 provider: &ProviderId,
18 kind: &str,
19 headers: &BTreeMap<String, String>,
20 raw: JsonValue,
21 ) -> Result<Self, ProviderCatalogError> {
22 provider_catalog()
23 .read()
24 .expect("provider catalog poisoned")
25 .normalize(provider, kind, headers, raw)
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30pub struct ProviderSecretRequirement {
31 pub name: String,
32 pub required: bool,
33 pub namespace: String,
34}
35
36#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
37pub struct ProviderOutboundMethod {
38 pub name: String,
39}
40
41#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
42#[serde(tag = "kind", rename_all = "snake_case")]
43pub enum SignatureVerificationMetadata {
44 #[default]
45 None,
46 Hmac {
47 variant: String,
48 raw_body: bool,
49 signature_header: String,
50 timestamp_header: Option<String>,
51 id_header: Option<String>,
52 default_tolerance_secs: Option<i64>,
53 digest: String,
54 encoding: String,
55 },
56}
57
58#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
59#[serde(tag = "kind", rename_all = "snake_case")]
60pub enum ProviderRuntimeMetadata {
61 Builtin {
62 connector: String,
63 default_signature_variant: Option<String>,
64 },
65 #[default]
66 Placeholder,
67}
68
69#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
70pub struct ProviderMetadata {
71 pub provider: String,
72 #[serde(default)]
73 pub kinds: Vec<String>,
74 pub schema_name: String,
75 #[serde(default)]
76 pub outbound_methods: Vec<ProviderOutboundMethod>,
77 #[serde(default)]
78 pub secret_requirements: Vec<ProviderSecretRequirement>,
79 #[serde(default)]
80 pub signature_verification: SignatureVerificationMetadata,
81 #[serde(default)]
82 pub runtime: ProviderRuntimeMetadata,
83}
84
85impl ProviderMetadata {
86 pub fn supports_kind(&self, kind: &str) -> bool {
87 self.kinds.iter().any(|candidate| candidate == kind)
88 }
89
90 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
91 self.secret_requirements
92 .iter()
93 .filter(|requirement| requirement.required)
94 .map(|requirement| requirement.name.as_str())
95 }
96}
97
98pub trait ProviderSchema: Send + Sync {
99 fn provider_id(&self) -> &str;
100 fn harn_schema_name(&self) -> &str;
101 fn metadata(&self) -> ProviderMetadata {
102 ProviderMetadata {
103 provider: self.provider_id().to_string(),
104 schema_name: self.harn_schema_name().to_string(),
105 ..ProviderMetadata::default()
106 }
107 }
108 fn normalize(
109 &self,
110 kind: &str,
111 headers: &BTreeMap<String, String>,
112 raw: JsonValue,
113 ) -> Result<ProviderPayload, ProviderCatalogError>;
114}
115
116#[derive(Clone, Debug, PartialEq, Eq)]
117pub enum ProviderCatalogError {
118 DuplicateProvider(String),
119 UnknownProvider(String),
120}
121
122impl std::fmt::Display for ProviderCatalogError {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 match self {
125 Self::DuplicateProvider(provider) => {
126 write!(f, "provider `{provider}` is already registered")
127 }
128 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
129 }
130 }
131}
132
133impl std::error::Error for ProviderCatalogError {}
134
135#[derive(Clone, Default)]
136pub struct ProviderCatalog {
137 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
138}
139
140impl ProviderCatalog {
141 pub fn with_defaults() -> Self {
142 let mut catalog = Self::default();
143 for schema in default_provider_schemas() {
144 catalog
145 .register(schema)
146 .expect("default providers must register cleanly");
147 }
148 catalog
149 }
150
151 pub fn with_defaults_and(
152 schemas: Vec<Arc<dyn ProviderSchema>>,
153 ) -> Result<Self, ProviderCatalogError> {
154 let mut catalog = Self::with_defaults();
155 let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
156 for schema in schemas {
157 if builtin_providers.contains(schema.provider_id()) {
158 continue;
159 }
160 catalog.register(schema)?;
161 }
162 Ok(catalog)
163 }
164
165 pub fn register(
166 &mut self,
167 schema: Arc<dyn ProviderSchema>,
168 ) -> Result<(), ProviderCatalogError> {
169 let provider = schema.provider_id().to_string();
170 if self.providers.contains_key(provider.as_str()) {
171 return Err(ProviderCatalogError::DuplicateProvider(provider));
172 }
173 self.providers.insert(provider, schema);
174 Ok(())
175 }
176
177 pub fn normalize(
178 &self,
179 provider: &ProviderId,
180 kind: &str,
181 headers: &BTreeMap<String, String>,
182 raw: JsonValue,
183 ) -> Result<ProviderPayload, ProviderCatalogError> {
184 let schema = self
185 .providers
186 .get(provider.as_str())
187 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
188 schema.normalize(kind, headers, raw)
189 }
190
191 pub fn schema_names(&self) -> BTreeMap<String, String> {
192 self.providers
193 .iter()
194 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
195 .collect()
196 }
197
198 pub fn entries(&self) -> Vec<ProviderMetadata> {
199 self.providers
200 .values()
201 .map(|schema| schema.metadata())
202 .collect()
203 }
204
205 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
206 self.providers.get(provider).map(|schema| schema.metadata())
207 }
208}
209
210pub fn register_provider_schema(
211 schema: Arc<dyn ProviderSchema>,
212) -> Result<(), ProviderCatalogError> {
213 provider_catalog()
214 .write()
215 .expect("provider catalog poisoned")
216 .register(schema)
217}
218
219pub fn reset_provider_catalog() {
220 *provider_catalog()
221 .write()
222 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
223}
224
225pub fn reset_provider_catalog_with(
226 schemas: Vec<Arc<dyn ProviderSchema>>,
227) -> Result<(), ProviderCatalogError> {
228 let catalog = ProviderCatalog::with_defaults_and(schemas)?;
229 install_provider_catalog(catalog);
230 Ok(())
231}
232
233pub fn install_provider_catalog(catalog: ProviderCatalog) {
234 *provider_catalog()
235 .write()
236 .expect("provider catalog poisoned") = catalog;
237}
238
239pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
240 provider_catalog()
241 .read()
242 .expect("provider catalog poisoned")
243 .schema_names()
244}
245
246pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
247 provider_catalog()
248 .read()
249 .expect("provider catalog poisoned")
250 .entries()
251}
252
253pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
254 provider_catalog()
255 .read()
256 .expect("provider catalog poisoned")
257 .metadata_for(provider)
258}
259
260fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
261 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
262 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
263}
264
265struct BuiltinProviderSchema {
266 provider_id: &'static str,
267 harn_schema_name: &'static str,
268 metadata: ProviderMetadata,
269 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
270}
271
272impl ProviderSchema for BuiltinProviderSchema {
273 fn provider_id(&self) -> &str {
274 self.provider_id
275 }
276
277 fn harn_schema_name(&self) -> &str {
278 self.harn_schema_name
279 }
280
281 fn metadata(&self) -> ProviderMetadata {
282 self.metadata.clone()
283 }
284
285 fn normalize(
286 &self,
287 kind: &str,
288 headers: &BTreeMap<String, String>,
289 raw: JsonValue,
290 ) -> Result<ProviderPayload, ProviderCatalogError> {
291 Ok((self.normalize)(kind, headers, raw))
292 }
293}
294
295fn provider_metadata_entry(
296 provider: &str,
297 kinds: &[&str],
298 schema_name: &str,
299 outbound_methods: &[&str],
300 signature_verification: SignatureVerificationMetadata,
301 secret_requirements: Vec<ProviderSecretRequirement>,
302 runtime: ProviderRuntimeMetadata,
303) -> ProviderMetadata {
304 ProviderMetadata {
305 provider: provider.to_string(),
306 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
307 schema_name: schema_name.to_string(),
308 outbound_methods: outbound_methods
309 .iter()
310 .map(|name| ProviderOutboundMethod {
311 name: (*name).to_string(),
312 })
313 .collect(),
314 secret_requirements,
315 signature_verification,
316 runtime,
317 }
318}
319
320fn hmac_signature_metadata(
321 variant: &str,
322 signature_header: &str,
323 timestamp_header: Option<&str>,
324 id_header: Option<&str>,
325 default_tolerance_secs: Option<i64>,
326 encoding: &str,
327) -> SignatureVerificationMetadata {
328 SignatureVerificationMetadata::Hmac {
329 variant: variant.to_string(),
330 raw_body: true,
331 signature_header: signature_header.to_string(),
332 timestamp_header: timestamp_header.map(ToString::to_string),
333 id_header: id_header.map(ToString::to_string),
334 default_tolerance_secs,
335 digest: "sha256".to_string(),
336 encoding: encoding.to_string(),
337 }
338}
339
340fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
341 ProviderSecretRequirement {
342 name: name.to_string(),
343 required: true,
344 namespace: namespace.to_string(),
345 }
346}
347
348fn outbound_method(name: &str) -> ProviderOutboundMethod {
349 ProviderOutboundMethod {
350 name: name.to_string(),
351 }
352}
353
354fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
355 ProviderSecretRequirement {
356 name: name.to_string(),
357 required: false,
358 namespace: namespace.to_string(),
359 }
360}
361
362fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
363 vec![
364 Arc::new(BuiltinProviderSchema {
365 provider_id: "github",
366 harn_schema_name: "GitHubEventPayload",
367 metadata: provider_metadata_entry(
368 "github",
369 &["webhook"],
370 "GitHubEventPayload",
371 &[
372 "github.pr.list",
373 "github.pr.view",
374 "github.pr.checks",
375 "github.pr.merge",
376 "github.pr.enable_auto_merge",
377 "github.pr.comment",
378 "github.actions.workflow_dispatch",
379 "github.actions.runs",
380 "github.actions.run",
381 "github.actions.logs",
382 "github.release.latest",
383 "github.release.assets",
384 "github.merge_queue.entries",
385 "github.merge_queue.enqueue",
386 "github.issue.create",
387 "github.issue.comment",
388 "github.branch.protection",
389 "api_call",
390 "issues.create_comment",
391 "issues.create",
392 "issues.create_with_template",
393 "issues.update",
394 "issues.add_labels",
395 "pulls.list",
396 "pulls.list_with_checks",
397 "pulls.get",
398 "pulls.merge",
399 "pulls.merge_safe",
400 "pulls.create_review_comment",
401 "pulls.get_diff",
402 "pulls.list_files",
403 "pulls.list_reviews",
404 "repos.get_content",
405 "repos.get_text",
406 "repos.get_latest_release",
407 "repos.list_release_assets",
408 "repos.get_branch_protection",
409 "git.delete_ref",
410 "actions.workflow_dispatch",
411 "actions.workflow_runs.list",
412 "actions.workflow_run.get",
413 "check_runs.create",
414 "check_runs.update",
415 "graphql",
416 ],
417 hmac_signature_metadata(
418 "github",
419 "X-Hub-Signature-256",
420 None,
421 Some("X-GitHub-Delivery"),
422 None,
423 "hex",
424 ),
425 vec![required_secret("signing_secret", "github")],
426 ProviderRuntimeMetadata::Placeholder,
427 ),
428 normalize: github_payload,
429 }),
430 Arc::new(BuiltinProviderSchema {
431 provider_id: "slack",
432 harn_schema_name: "SlackEventPayload",
433 metadata: provider_metadata_entry(
434 "slack",
435 &["webhook"],
436 "SlackEventPayload",
437 &[
438 "post_message",
439 "update_message",
440 "add_reaction",
441 "open_view",
442 "user_info",
443 "api_call",
444 "upload_file",
445 ],
446 hmac_signature_metadata(
447 "slack",
448 "X-Slack-Signature",
449 Some("X-Slack-Request-Timestamp"),
450 None,
451 Some(300),
452 "hex",
453 ),
454 vec![required_secret("signing_secret", "slack")],
455 ProviderRuntimeMetadata::Placeholder,
456 ),
457 normalize: slack_payload,
458 }),
459 Arc::new(BuiltinProviderSchema {
460 provider_id: "linear",
461 harn_schema_name: "LinearEventPayload",
462 metadata: {
463 let mut metadata = provider_metadata_entry(
464 "linear",
465 &["webhook"],
466 "LinearEventPayload",
467 &[],
468 hmac_signature_metadata(
469 "linear",
470 "Linear-Signature",
471 None,
472 Some("Linear-Delivery"),
473 Some(75),
474 "hex",
475 ),
476 vec![
477 required_secret("signing_secret", "linear"),
478 optional_secret("access_token", "linear"),
479 ],
480 ProviderRuntimeMetadata::Placeholder,
481 );
482 metadata.outbound_methods = vec![
483 outbound_method("list_issues"),
484 outbound_method("update_issue"),
485 outbound_method("create_comment"),
486 outbound_method("search"),
487 outbound_method("graphql"),
488 ];
489 metadata
490 },
491 normalize: linear_payload,
492 }),
493 Arc::new(BuiltinProviderSchema {
494 provider_id: "notion",
495 harn_schema_name: "NotionEventPayload",
496 metadata: {
497 let mut metadata = provider_metadata_entry(
498 "notion",
499 &["webhook", "poll"],
500 "NotionEventPayload",
501 &[],
502 hmac_signature_metadata(
503 "notion",
504 "X-Notion-Signature",
505 None,
506 None,
507 None,
508 "hex",
509 ),
510 vec![required_secret("verification_token", "notion")],
511 ProviderRuntimeMetadata::Placeholder,
512 );
513 metadata.outbound_methods = vec![
514 outbound_method("get_page"),
515 outbound_method("update_page"),
516 outbound_method("append_blocks"),
517 outbound_method("query_database"),
518 outbound_method("search"),
519 outbound_method("create_comment"),
520 outbound_method("api_call"),
521 ];
522 metadata
523 },
524 normalize: notion_payload,
525 }),
526 Arc::new(BuiltinProviderSchema {
527 provider_id: "cron",
528 harn_schema_name: "CronEventPayload",
529 metadata: provider_metadata_entry(
530 "cron",
531 &["cron"],
532 "CronEventPayload",
533 &[],
534 SignatureVerificationMetadata::None,
535 Vec::new(),
536 ProviderRuntimeMetadata::Builtin {
537 connector: "cron".to_string(),
538 default_signature_variant: None,
539 },
540 ),
541 normalize: cron_payload,
542 }),
543 Arc::new(BuiltinProviderSchema {
544 provider_id: "webhook",
545 harn_schema_name: "GenericWebhookPayload",
546 metadata: provider_metadata_entry(
547 "webhook",
548 &["webhook"],
549 "GenericWebhookPayload",
550 &[],
551 hmac_signature_metadata(
552 "standard",
553 "webhook-signature",
554 Some("webhook-timestamp"),
555 Some("webhook-id"),
556 Some(300),
557 "base64",
558 ),
559 vec![required_secret("signing_secret", "webhook")],
560 ProviderRuntimeMetadata::Builtin {
561 connector: "webhook".to_string(),
562 default_signature_variant: Some("standard".to_string()),
563 },
564 ),
565 normalize: webhook_payload,
566 }),
567 Arc::new(BuiltinProviderSchema {
568 provider_id: "a2a-push",
569 harn_schema_name: "A2aPushPayload",
570 metadata: provider_metadata_entry(
571 "a2a-push",
572 &["a2a-push"],
573 "A2aPushPayload",
574 &[],
575 SignatureVerificationMetadata::None,
576 Vec::new(),
577 ProviderRuntimeMetadata::Builtin {
578 connector: "a2a-push".to_string(),
579 default_signature_variant: None,
580 },
581 ),
582 normalize: a2a_push_payload,
583 }),
584 Arc::new(stream_provider_schema("kafka", kafka_payload)),
585 Arc::new(stream_provider_schema("nats", nats_payload)),
586 Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
587 Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
588 Arc::new(stream_provider_schema("email", email_payload)),
589 Arc::new(stream_provider_schema("websocket", websocket_payload)),
590 ]
591}
592
593fn stream_provider_schema(
594 provider_id: &'static str,
595 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
596) -> BuiltinProviderSchema {
597 BuiltinProviderSchema {
598 provider_id,
599 harn_schema_name: "StreamEventPayload",
600 metadata: provider_metadata_entry(
601 provider_id,
602 &["stream"],
603 "StreamEventPayload",
604 &[],
605 SignatureVerificationMetadata::None,
606 Vec::new(),
607 ProviderRuntimeMetadata::Builtin {
608 connector: "stream".to_string(),
609 default_signature_variant: None,
610 },
611 ),
612 normalize,
613 }
614}