1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14 value: &Option<Vec<u8>>,
15 serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18 S: Serializer,
19{
20 use base64::Engine;
21
22 match value {
23 Some(bytes) => {
24 serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25 }
26 None => serializer.serialize_none(),
27 }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32 D: Deserializer<'de>,
33{
34 use base64::Engine;
35
36 let encoded = Option::<String>::deserialize(deserializer)?;
37 encoded
38 .map(|text| {
39 base64::engine::general_purpose::STANDARD
40 .decode(text.as_bytes())
41 .map_err(serde::de::Error::custom)
42 })
43 .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51 pub fn new() -> Self {
52 Self(format!("trigger_evt_{}", Uuid::now_v7()))
53 }
54}
55
56impl Default for TriggerEventId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67 pub fn new(value: impl Into<String>) -> Self {
68 Self(value.into())
69 }
70
71 pub fn as_str(&self) -> &str {
72 self.0.as_str()
73 }
74}
75
76impl From<&str> for ProviderId {
77 fn from(value: &str) -> Self {
78 Self::new(value)
79 }
80}
81
82impl From<String> for ProviderId {
83 fn from(value: String) -> Self {
84 Self::new(value)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93 pub fn new() -> Self {
94 Self(format!("trace_{}", Uuid::now_v7()))
95 }
96}
97
98impl Default for TraceId {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117 Verified,
118 Unsigned,
119 Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124 pub event: String,
125 pub action: Option<String>,
126 pub delivery_id: Option<String>,
127 pub installation_id: Option<i64>,
128 pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133 #[serde(flatten)]
134 pub common: GitHubEventCommon,
135 pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140 #[serde(flatten)]
141 pub common: GitHubEventCommon,
142 pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147 #[serde(flatten)]
148 pub common: GitHubEventCommon,
149 pub issue: JsonValue,
150 pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155 #[serde(flatten)]
156 pub common: GitHubEventCommon,
157 pub pull_request: JsonValue,
158 pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163 #[serde(flatten)]
164 pub common: GitHubEventCommon,
165 #[serde(default)]
166 pub commits: Vec<JsonValue>,
167 pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172 #[serde(flatten)]
173 pub common: GitHubEventCommon,
174 pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178#[serde(untagged)]
179pub enum GitHubEventPayload {
180 Issues(GitHubIssuesEventPayload),
181 PullRequest(GitHubPullRequestEventPayload),
182 IssueComment(GitHubIssueCommentEventPayload),
183 PullRequestReview(GitHubPullRequestReviewEventPayload),
184 Push(GitHubPushEventPayload),
185 WorkflowRun(GitHubWorkflowRunEventPayload),
186 Other(GitHubEventCommon),
187}
188
189#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
190pub struct SlackEventCommon {
191 pub event: String,
192 pub event_id: Option<String>,
193 pub api_app_id: Option<String>,
194 pub team_id: Option<String>,
195 pub channel_id: Option<String>,
196 pub user_id: Option<String>,
197 pub event_ts: Option<String>,
198 pub raw: JsonValue,
199}
200
201#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
202pub struct SlackMessageEventPayload {
203 #[serde(flatten)]
204 pub common: SlackEventCommon,
205 pub subtype: Option<String>,
206 pub channel_type: Option<String>,
207 pub channel: Option<String>,
208 pub user: Option<String>,
209 pub text: Option<String>,
210 pub ts: Option<String>,
211 pub thread_ts: Option<String>,
212}
213
214#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
215pub struct SlackAppMentionEventPayload {
216 #[serde(flatten)]
217 pub common: SlackEventCommon,
218 pub channel: Option<String>,
219 pub user: Option<String>,
220 pub text: Option<String>,
221 pub ts: Option<String>,
222 pub thread_ts: Option<String>,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
226pub struct SlackReactionAddedEventPayload {
227 #[serde(flatten)]
228 pub common: SlackEventCommon,
229 pub reaction: Option<String>,
230 pub item_user: Option<String>,
231 pub item: JsonValue,
232}
233
234#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
235pub struct SlackAppHomeOpenedEventPayload {
236 #[serde(flatten)]
237 pub common: SlackEventCommon,
238 pub user: Option<String>,
239 pub channel: Option<String>,
240 pub tab: Option<String>,
241 pub view: JsonValue,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245pub struct SlackAssistantThreadStartedEventPayload {
246 #[serde(flatten)]
247 pub common: SlackEventCommon,
248 pub assistant_thread: JsonValue,
249 pub thread_ts: Option<String>,
250 pub context: JsonValue,
251}
252
253#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
254#[serde(untagged)]
255pub enum SlackEventPayload {
256 Message(SlackMessageEventPayload),
257 AppMention(SlackAppMentionEventPayload),
258 ReactionAdded(SlackReactionAddedEventPayload),
259 AppHomeOpened(SlackAppHomeOpenedEventPayload),
260 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
261 Other(SlackEventCommon),
262}
263
264#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
265pub struct LinearEventCommon {
266 pub event: String,
267 pub action: Option<String>,
268 pub delivery_id: Option<String>,
269 pub organization_id: Option<String>,
270 pub webhook_timestamp: Option<i64>,
271 pub webhook_id: Option<String>,
272 pub url: Option<String>,
273 pub created_at: Option<String>,
274 pub actor: JsonValue,
275 pub raw: JsonValue,
276}
277
278#[derive(Clone, Debug, PartialEq)]
279pub enum LinearIssueChange {
280 Title { previous: Option<String> },
281 Description { previous: Option<String> },
282 Priority { previous: Option<i64> },
283 Estimate { previous: Option<i64> },
284 StateId { previous: Option<String> },
285 TeamId { previous: Option<String> },
286 AssigneeId { previous: Option<String> },
287 ProjectId { previous: Option<String> },
288 CycleId { previous: Option<String> },
289 DueDate { previous: Option<String> },
290 ParentId { previous: Option<String> },
291 SortOrder { previous: Option<f64> },
292 LabelIds { previous: Vec<String> },
293 CompletedAt { previous: Option<String> },
294 Other { field: String, previous: JsonValue },
295}
296
297impl Serialize for LinearIssueChange {
298 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
299 where
300 S: Serializer,
301 {
302 let value = match self {
303 Self::Title { previous } => {
304 serde_json::json!({ "field_name": "title", "previous": previous })
305 }
306 Self::Description { previous } => {
307 serde_json::json!({ "field_name": "description", "previous": previous })
308 }
309 Self::Priority { previous } => {
310 serde_json::json!({ "field_name": "priority", "previous": previous })
311 }
312 Self::Estimate { previous } => {
313 serde_json::json!({ "field_name": "estimate", "previous": previous })
314 }
315 Self::StateId { previous } => {
316 serde_json::json!({ "field_name": "state_id", "previous": previous })
317 }
318 Self::TeamId { previous } => {
319 serde_json::json!({ "field_name": "team_id", "previous": previous })
320 }
321 Self::AssigneeId { previous } => {
322 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
323 }
324 Self::ProjectId { previous } => {
325 serde_json::json!({ "field_name": "project_id", "previous": previous })
326 }
327 Self::CycleId { previous } => {
328 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
329 }
330 Self::DueDate { previous } => {
331 serde_json::json!({ "field_name": "due_date", "previous": previous })
332 }
333 Self::ParentId { previous } => {
334 serde_json::json!({ "field_name": "parent_id", "previous": previous })
335 }
336 Self::SortOrder { previous } => {
337 serde_json::json!({ "field_name": "sort_order", "previous": previous })
338 }
339 Self::LabelIds { previous } => {
340 serde_json::json!({ "field_name": "label_ids", "previous": previous })
341 }
342 Self::CompletedAt { previous } => {
343 serde_json::json!({ "field_name": "completed_at", "previous": previous })
344 }
345 Self::Other { field, previous } => {
346 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
347 }
348 };
349 value.serialize(serializer)
350 }
351}
352
353impl<'de> Deserialize<'de> for LinearIssueChange {
354 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
355 where
356 D: Deserializer<'de>,
357 {
358 let value = JsonValue::deserialize(deserializer)?;
359 let field_name = value
360 .get("field_name")
361 .and_then(JsonValue::as_str)
362 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
363 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
364 Ok(match field_name {
365 "title" => Self::Title {
366 previous: previous.as_str().map(ToString::to_string),
367 },
368 "description" => Self::Description {
369 previous: previous.as_str().map(ToString::to_string),
370 },
371 "priority" => Self::Priority {
372 previous: parse_json_i64ish(&previous),
373 },
374 "estimate" => Self::Estimate {
375 previous: parse_json_i64ish(&previous),
376 },
377 "state_id" => Self::StateId {
378 previous: previous.as_str().map(ToString::to_string),
379 },
380 "team_id" => Self::TeamId {
381 previous: previous.as_str().map(ToString::to_string),
382 },
383 "assignee_id" => Self::AssigneeId {
384 previous: previous.as_str().map(ToString::to_string),
385 },
386 "project_id" => Self::ProjectId {
387 previous: previous.as_str().map(ToString::to_string),
388 },
389 "cycle_id" => Self::CycleId {
390 previous: previous.as_str().map(ToString::to_string),
391 },
392 "due_date" => Self::DueDate {
393 previous: previous.as_str().map(ToString::to_string),
394 },
395 "parent_id" => Self::ParentId {
396 previous: previous.as_str().map(ToString::to_string),
397 },
398 "sort_order" => Self::SortOrder {
399 previous: previous.as_f64(),
400 },
401 "label_ids" => Self::LabelIds {
402 previous: parse_string_array(&previous),
403 },
404 "completed_at" => Self::CompletedAt {
405 previous: previous.as_str().map(ToString::to_string),
406 },
407 "other" => Self::Other {
408 field: value
409 .get("field")
410 .and_then(JsonValue::as_str)
411 .map(ToString::to_string)
412 .unwrap_or_else(|| "unknown".to_string()),
413 previous,
414 },
415 other => Self::Other {
416 field: other.to_string(),
417 previous,
418 },
419 })
420 }
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424pub struct LinearIssueEventPayload {
425 #[serde(flatten)]
426 pub common: LinearEventCommon,
427 pub issue: JsonValue,
428 #[serde(default)]
429 pub changes: Vec<LinearIssueChange>,
430}
431
432#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
433pub struct LinearIssueCommentEventPayload {
434 #[serde(flatten)]
435 pub common: LinearEventCommon,
436 pub comment: JsonValue,
437}
438
439#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
440pub struct LinearIssueLabelEventPayload {
441 #[serde(flatten)]
442 pub common: LinearEventCommon,
443 pub label: JsonValue,
444}
445
446#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
447pub struct LinearProjectEventPayload {
448 #[serde(flatten)]
449 pub common: LinearEventCommon,
450 pub project: JsonValue,
451}
452
453#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
454pub struct LinearCycleEventPayload {
455 #[serde(flatten)]
456 pub common: LinearEventCommon,
457 pub cycle: JsonValue,
458}
459
460#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
461pub struct LinearCustomerEventPayload {
462 #[serde(flatten)]
463 pub common: LinearEventCommon,
464 pub customer: JsonValue,
465}
466
467#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
468pub struct LinearCustomerRequestEventPayload {
469 #[serde(flatten)]
470 pub common: LinearEventCommon,
471 pub customer_request: JsonValue,
472}
473
474#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
475#[serde(untagged)]
476pub enum LinearEventPayload {
477 Issue(LinearIssueEventPayload),
478 IssueComment(LinearIssueCommentEventPayload),
479 IssueLabel(LinearIssueLabelEventPayload),
480 Project(LinearProjectEventPayload),
481 Cycle(LinearCycleEventPayload),
482 Customer(LinearCustomerEventPayload),
483 CustomerRequest(LinearCustomerRequestEventPayload),
484 Other(LinearEventCommon),
485}
486
487#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
488pub struct NotionPolledChangeEvent {
489 pub resource: String,
490 pub source_id: String,
491 pub entity_id: String,
492 pub high_water_mark: String,
493 pub before: Option<JsonValue>,
494 pub after: JsonValue,
495}
496
497#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
498pub struct NotionEventPayload {
499 pub event: String,
500 pub workspace_id: Option<String>,
501 pub request_id: Option<String>,
502 pub subscription_id: Option<String>,
503 pub integration_id: Option<String>,
504 pub attempt_number: Option<u32>,
505 pub entity_id: Option<String>,
506 pub entity_type: Option<String>,
507 pub api_version: Option<String>,
508 pub verification_token: Option<String>,
509 pub polled: Option<NotionPolledChangeEvent>,
510 pub raw: JsonValue,
511}
512
513#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
514pub struct CronEventPayload {
515 pub cron_id: Option<String>,
516 pub schedule: Option<String>,
517 #[serde(with = "time::serde::rfc3339")]
518 pub tick_at: OffsetDateTime,
519 pub raw: JsonValue,
520}
521
522#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
523pub struct GenericWebhookPayload {
524 pub source: Option<String>,
525 pub content_type: Option<String>,
526 pub raw: JsonValue,
527}
528
529#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
530pub struct A2aPushPayload {
531 pub task_id: Option<String>,
532 pub sender: Option<String>,
533 pub raw: JsonValue,
534}
535
536#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
537pub struct ExtensionProviderPayload {
538 pub provider: String,
539 pub schema_name: String,
540 pub raw: JsonValue,
541}
542
543#[allow(clippy::large_enum_variant)]
544#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
545#[serde(untagged)]
546pub enum ProviderPayload {
547 Known(KnownProviderPayload),
548 Extension(ExtensionProviderPayload),
549}
550
551impl ProviderPayload {
552 pub fn provider(&self) -> &str {
553 match self {
554 Self::Known(known) => known.provider(),
555 Self::Extension(payload) => payload.provider.as_str(),
556 }
557 }
558
559 pub fn normalize(
560 provider: &ProviderId,
561 kind: &str,
562 headers: &BTreeMap<String, String>,
563 raw: JsonValue,
564 ) -> Result<Self, ProviderCatalogError> {
565 provider_catalog()
566 .read()
567 .expect("provider catalog poisoned")
568 .normalize(provider, kind, headers, raw)
569 }
570}
571
572#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
573#[serde(tag = "provider")]
574pub enum KnownProviderPayload {
575 #[serde(rename = "github")]
576 GitHub(GitHubEventPayload),
577 #[serde(rename = "slack")]
578 Slack(Box<SlackEventPayload>),
579 #[serde(rename = "linear")]
580 Linear(LinearEventPayload),
581 #[serde(rename = "notion")]
582 Notion(Box<NotionEventPayload>),
583 #[serde(rename = "cron")]
584 Cron(CronEventPayload),
585 #[serde(rename = "webhook")]
586 Webhook(GenericWebhookPayload),
587 #[serde(rename = "a2a-push")]
588 A2aPush(A2aPushPayload),
589}
590
591impl KnownProviderPayload {
592 pub fn provider(&self) -> &str {
593 match self {
594 Self::GitHub(_) => "github",
595 Self::Slack(_) => "slack",
596 Self::Linear(_) => "linear",
597 Self::Notion(_) => "notion",
598 Self::Cron(_) => "cron",
599 Self::Webhook(_) => "webhook",
600 Self::A2aPush(_) => "a2a-push",
601 }
602 }
603}
604
605#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
606pub struct TriggerEvent {
607 pub id: TriggerEventId,
608 pub provider: ProviderId,
609 pub kind: String,
610 #[serde(with = "time::serde::rfc3339")]
611 pub received_at: OffsetDateTime,
612 #[serde(with = "time::serde::rfc3339::option")]
613 pub occurred_at: Option<OffsetDateTime>,
614 pub dedupe_key: String,
615 pub trace_id: TraceId,
616 pub tenant_id: Option<TenantId>,
617 pub headers: BTreeMap<String, String>,
618 #[serde(default, skip_serializing_if = "Option::is_none")]
619 pub batch: Option<Vec<JsonValue>>,
620 #[serde(
621 default,
622 skip_serializing_if = "Option::is_none",
623 serialize_with = "serialize_optional_bytes_b64",
624 deserialize_with = "deserialize_optional_bytes_b64"
625 )]
626 pub raw_body: Option<Vec<u8>>,
627 pub provider_payload: ProviderPayload,
628 pub signature_status: SignatureStatus,
629 #[serde(skip)]
630 pub dedupe_claimed: bool,
631}
632
633impl TriggerEvent {
634 #[allow(clippy::too_many_arguments)]
635 pub fn new(
636 provider: ProviderId,
637 kind: impl Into<String>,
638 occurred_at: Option<OffsetDateTime>,
639 dedupe_key: impl Into<String>,
640 tenant_id: Option<TenantId>,
641 headers: BTreeMap<String, String>,
642 provider_payload: ProviderPayload,
643 signature_status: SignatureStatus,
644 ) -> Self {
645 Self {
646 id: TriggerEventId::new(),
647 provider,
648 kind: kind.into(),
649 received_at: clock::now_utc(),
650 occurred_at,
651 dedupe_key: dedupe_key.into(),
652 trace_id: TraceId::new(),
653 tenant_id,
654 headers,
655 batch: None,
656 raw_body: None,
657 provider_payload,
658 signature_status,
659 dedupe_claimed: false,
660 }
661 }
662
663 pub fn dedupe_claimed(&self) -> bool {
664 self.dedupe_claimed
665 }
666
667 pub fn mark_dedupe_claimed(&mut self) {
668 self.dedupe_claimed = true;
669 }
670}
671
672#[derive(Clone, Debug, PartialEq, Eq)]
673pub struct HeaderRedactionPolicy {
674 safe_exact_names: BTreeSet<String>,
675}
676
677impl HeaderRedactionPolicy {
678 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
679 self.safe_exact_names
680 .insert(name.into().to_ascii_lowercase());
681 self
682 }
683
684 fn should_keep(&self, name: &str) -> bool {
685 let lower = name.to_ascii_lowercase();
686 if self.safe_exact_names.contains(lower.as_str()) {
687 return true;
688 }
689 matches!(
690 lower.as_str(),
691 "user-agent"
692 | "request-id"
693 | "x-request-id"
694 | "x-correlation-id"
695 | "content-type"
696 | "content-length"
697 | "x-github-event"
698 | "x-github-delivery"
699 | "x-github-hook-id"
700 | "x-hub-signature-256"
701 | "x-slack-request-timestamp"
702 | "x-slack-signature"
703 | "x-linear-signature"
704 | "x-notion-signature"
705 | "x-a2a-signature"
706 | "x-a2a-delivery"
707 ) || lower.ends_with("-event")
708 || lower.ends_with("-delivery")
709 || lower.contains("timestamp")
710 || lower.contains("request-id")
711 }
712
713 fn should_redact(&self, name: &str) -> bool {
714 let lower = name.to_ascii_lowercase();
715 if self.should_keep(lower.as_str()) {
716 return false;
717 }
718 lower.contains("authorization")
719 || lower.contains("cookie")
720 || lower.contains("secret")
721 || lower.contains("token")
722 || lower.contains("key")
723 }
724}
725
726impl Default for HeaderRedactionPolicy {
727 fn default() -> Self {
728 Self {
729 safe_exact_names: BTreeSet::from([
730 "content-length".to_string(),
731 "content-type".to_string(),
732 "request-id".to_string(),
733 "user-agent".to_string(),
734 "x-a2a-delivery".to_string(),
735 "x-a2a-signature".to_string(),
736 "x-correlation-id".to_string(),
737 "x-github-delivery".to_string(),
738 "x-github-event".to_string(),
739 "x-github-hook-id".to_string(),
740 "x-hub-signature-256".to_string(),
741 "x-linear-signature".to_string(),
742 "x-notion-signature".to_string(),
743 "x-request-id".to_string(),
744 "x-slack-request-timestamp".to_string(),
745 "x-slack-signature".to_string(),
746 ]),
747 }
748 }
749}
750
751pub fn redact_headers(
752 headers: &BTreeMap<String, String>,
753 policy: &HeaderRedactionPolicy,
754) -> BTreeMap<String, String> {
755 headers
756 .iter()
757 .map(|(name, value)| {
758 if policy.should_redact(name) {
759 (name.clone(), REDACTED_HEADER_VALUE.to_string())
760 } else {
761 (name.clone(), value.clone())
762 }
763 })
764 .collect()
765}
766
767#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
768pub struct ProviderSecretRequirement {
769 pub name: String,
770 pub required: bool,
771 pub namespace: String,
772}
773
774#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
775pub struct ProviderOutboundMethod {
776 pub name: String,
777}
778
779#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
780#[serde(tag = "kind", rename_all = "snake_case")]
781pub enum SignatureVerificationMetadata {
782 #[default]
783 None,
784 Hmac {
785 variant: String,
786 raw_body: bool,
787 signature_header: String,
788 timestamp_header: Option<String>,
789 id_header: Option<String>,
790 default_tolerance_secs: Option<i64>,
791 digest: String,
792 encoding: String,
793 },
794}
795
796#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
797#[serde(tag = "kind", rename_all = "snake_case")]
798pub enum ProviderRuntimeMetadata {
799 Builtin {
800 connector: String,
801 default_signature_variant: Option<String>,
802 },
803 #[default]
804 Placeholder,
805}
806
807#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
808pub struct ProviderMetadata {
809 pub provider: String,
810 #[serde(default)]
811 pub kinds: Vec<String>,
812 pub schema_name: String,
813 #[serde(default)]
814 pub outbound_methods: Vec<ProviderOutboundMethod>,
815 #[serde(default)]
816 pub secret_requirements: Vec<ProviderSecretRequirement>,
817 #[serde(default)]
818 pub signature_verification: SignatureVerificationMetadata,
819 #[serde(default)]
820 pub runtime: ProviderRuntimeMetadata,
821}
822
823impl ProviderMetadata {
824 pub fn supports_kind(&self, kind: &str) -> bool {
825 self.kinds.iter().any(|candidate| candidate == kind)
826 }
827
828 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
829 self.secret_requirements
830 .iter()
831 .filter(|requirement| requirement.required)
832 .map(|requirement| requirement.name.as_str())
833 }
834}
835
836pub trait ProviderSchema: Send + Sync {
837 fn provider_id(&self) -> &'static str;
838 fn harn_schema_name(&self) -> &'static str;
839 fn metadata(&self) -> ProviderMetadata {
840 ProviderMetadata {
841 provider: self.provider_id().to_string(),
842 schema_name: self.harn_schema_name().to_string(),
843 ..ProviderMetadata::default()
844 }
845 }
846 fn normalize(
847 &self,
848 kind: &str,
849 headers: &BTreeMap<String, String>,
850 raw: JsonValue,
851 ) -> Result<ProviderPayload, ProviderCatalogError>;
852}
853
854#[derive(Clone, Debug, PartialEq, Eq)]
855pub enum ProviderCatalogError {
856 DuplicateProvider(String),
857 UnknownProvider(String),
858}
859
860impl std::fmt::Display for ProviderCatalogError {
861 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
862 match self {
863 Self::DuplicateProvider(provider) => {
864 write!(f, "provider `{provider}` is already registered")
865 }
866 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
867 }
868 }
869}
870
871impl std::error::Error for ProviderCatalogError {}
872
873#[derive(Clone, Default)]
874pub struct ProviderCatalog {
875 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
876}
877
878impl ProviderCatalog {
879 pub fn with_defaults() -> Self {
880 let mut catalog = Self::default();
881 for schema in default_provider_schemas() {
882 catalog
883 .register(schema)
884 .expect("default providers must register cleanly");
885 }
886 catalog
887 }
888
889 pub fn register(
890 &mut self,
891 schema: Arc<dyn ProviderSchema>,
892 ) -> Result<(), ProviderCatalogError> {
893 let provider = schema.provider_id().to_string();
894 if self.providers.contains_key(provider.as_str()) {
895 return Err(ProviderCatalogError::DuplicateProvider(provider));
896 }
897 self.providers.insert(provider, schema);
898 Ok(())
899 }
900
901 pub fn normalize(
902 &self,
903 provider: &ProviderId,
904 kind: &str,
905 headers: &BTreeMap<String, String>,
906 raw: JsonValue,
907 ) -> Result<ProviderPayload, ProviderCatalogError> {
908 let schema = self
909 .providers
910 .get(provider.as_str())
911 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
912 schema.normalize(kind, headers, raw)
913 }
914
915 pub fn schema_names(&self) -> BTreeMap<String, String> {
916 self.providers
917 .iter()
918 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
919 .collect()
920 }
921
922 pub fn entries(&self) -> Vec<ProviderMetadata> {
923 self.providers
924 .values()
925 .map(|schema| schema.metadata())
926 .collect()
927 }
928
929 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
930 self.providers.get(provider).map(|schema| schema.metadata())
931 }
932}
933
934pub fn register_provider_schema(
935 schema: Arc<dyn ProviderSchema>,
936) -> Result<(), ProviderCatalogError> {
937 provider_catalog()
938 .write()
939 .expect("provider catalog poisoned")
940 .register(schema)
941}
942
943pub fn reset_provider_catalog() {
944 *provider_catalog()
945 .write()
946 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
947}
948
949pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
950 provider_catalog()
951 .read()
952 .expect("provider catalog poisoned")
953 .schema_names()
954}
955
956pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
957 provider_catalog()
958 .read()
959 .expect("provider catalog poisoned")
960 .entries()
961}
962
963pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
964 provider_catalog()
965 .read()
966 .expect("provider catalog poisoned")
967 .metadata_for(provider)
968}
969
970fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
971 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
972 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
973}
974
975struct BuiltinProviderSchema {
976 provider_id: &'static str,
977 harn_schema_name: &'static str,
978 metadata: ProviderMetadata,
979 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
980}
981
982impl ProviderSchema for BuiltinProviderSchema {
983 fn provider_id(&self) -> &'static str {
984 self.provider_id
985 }
986
987 fn harn_schema_name(&self) -> &'static str {
988 self.harn_schema_name
989 }
990
991 fn metadata(&self) -> ProviderMetadata {
992 self.metadata.clone()
993 }
994
995 fn normalize(
996 &self,
997 kind: &str,
998 headers: &BTreeMap<String, String>,
999 raw: JsonValue,
1000 ) -> Result<ProviderPayload, ProviderCatalogError> {
1001 Ok((self.normalize)(kind, headers, raw))
1002 }
1003}
1004
1005fn provider_metadata_entry(
1006 provider: &str,
1007 kinds: &[&str],
1008 schema_name: &str,
1009 outbound_methods: &[&str],
1010 signature_verification: SignatureVerificationMetadata,
1011 secret_requirements: Vec<ProviderSecretRequirement>,
1012 runtime: ProviderRuntimeMetadata,
1013) -> ProviderMetadata {
1014 ProviderMetadata {
1015 provider: provider.to_string(),
1016 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1017 schema_name: schema_name.to_string(),
1018 outbound_methods: outbound_methods
1019 .iter()
1020 .map(|name| ProviderOutboundMethod {
1021 name: (*name).to_string(),
1022 })
1023 .collect(),
1024 secret_requirements,
1025 signature_verification,
1026 runtime,
1027 }
1028}
1029
1030fn hmac_signature_metadata(
1031 variant: &str,
1032 signature_header: &str,
1033 timestamp_header: Option<&str>,
1034 id_header: Option<&str>,
1035 default_tolerance_secs: Option<i64>,
1036 encoding: &str,
1037) -> SignatureVerificationMetadata {
1038 SignatureVerificationMetadata::Hmac {
1039 variant: variant.to_string(),
1040 raw_body: true,
1041 signature_header: signature_header.to_string(),
1042 timestamp_header: timestamp_header.map(ToString::to_string),
1043 id_header: id_header.map(ToString::to_string),
1044 default_tolerance_secs,
1045 digest: "sha256".to_string(),
1046 encoding: encoding.to_string(),
1047 }
1048}
1049
1050fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1051 ProviderSecretRequirement {
1052 name: name.to_string(),
1053 required: true,
1054 namespace: namespace.to_string(),
1055 }
1056}
1057
1058fn outbound_method(name: &str) -> ProviderOutboundMethod {
1059 ProviderOutboundMethod {
1060 name: name.to_string(),
1061 }
1062}
1063
1064fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1065 ProviderSecretRequirement {
1066 name: name.to_string(),
1067 required: false,
1068 namespace: namespace.to_string(),
1069 }
1070}
1071
1072fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1073 vec![
1074 Arc::new(BuiltinProviderSchema {
1075 provider_id: "github",
1076 harn_schema_name: "GitHubEventPayload",
1077 metadata: provider_metadata_entry(
1078 "github",
1079 &["webhook"],
1080 "GitHubEventPayload",
1081 &[],
1082 hmac_signature_metadata(
1083 "github",
1084 "X-Hub-Signature-256",
1085 None,
1086 Some("X-GitHub-Delivery"),
1087 None,
1088 "hex",
1089 ),
1090 vec![required_secret("signing_secret", "github")],
1091 ProviderRuntimeMetadata::Builtin {
1092 connector: "webhook".to_string(),
1093 default_signature_variant: Some("github".to_string()),
1094 },
1095 ),
1096 normalize: github_payload,
1097 }),
1098 Arc::new(BuiltinProviderSchema {
1099 provider_id: "slack",
1100 harn_schema_name: "SlackEventPayload",
1101 metadata: provider_metadata_entry(
1102 "slack",
1103 &["webhook"],
1104 "SlackEventPayload",
1105 &[
1106 "post_message",
1107 "update_message",
1108 "add_reaction",
1109 "open_view",
1110 "user_info",
1111 "api_call",
1112 "upload_file",
1113 ],
1114 hmac_signature_metadata(
1115 "slack",
1116 "X-Slack-Signature",
1117 Some("X-Slack-Request-Timestamp"),
1118 None,
1119 Some(300),
1120 "hex",
1121 ),
1122 vec![required_secret("signing_secret", "slack")],
1123 ProviderRuntimeMetadata::Builtin {
1124 connector: "slack".to_string(),
1125 default_signature_variant: Some("slack".to_string()),
1126 },
1127 ),
1128 normalize: slack_payload,
1129 }),
1130 Arc::new(BuiltinProviderSchema {
1131 provider_id: "linear",
1132 harn_schema_name: "LinearEventPayload",
1133 metadata: {
1134 let mut metadata = provider_metadata_entry(
1135 "linear",
1136 &["webhook"],
1137 "LinearEventPayload",
1138 &[],
1139 hmac_signature_metadata(
1140 "linear",
1141 "Linear-Signature",
1142 None,
1143 Some("Linear-Delivery"),
1144 Some(75),
1145 "hex",
1146 ),
1147 vec![
1148 required_secret("signing_secret", "linear"),
1149 optional_secret("access_token", "linear"),
1150 ],
1151 ProviderRuntimeMetadata::Builtin {
1152 connector: "linear".to_string(),
1153 default_signature_variant: Some("linear".to_string()),
1154 },
1155 );
1156 metadata.outbound_methods = vec![
1157 ProviderOutboundMethod {
1158 name: "list_issues".to_string(),
1159 },
1160 ProviderOutboundMethod {
1161 name: "update_issue".to_string(),
1162 },
1163 ProviderOutboundMethod {
1164 name: "create_comment".to_string(),
1165 },
1166 ProviderOutboundMethod {
1167 name: "search".to_string(),
1168 },
1169 ProviderOutboundMethod {
1170 name: "graphql".to_string(),
1171 },
1172 ];
1173 metadata
1174 },
1175 normalize: linear_payload,
1176 }),
1177 Arc::new(BuiltinProviderSchema {
1178 provider_id: "notion",
1179 harn_schema_name: "NotionEventPayload",
1180 metadata: {
1181 let mut metadata = provider_metadata_entry(
1182 "notion",
1183 &["webhook", "poll"],
1184 "NotionEventPayload",
1185 &[],
1186 hmac_signature_metadata(
1187 "notion",
1188 "X-Notion-Signature",
1189 None,
1190 None,
1191 None,
1192 "hex",
1193 ),
1194 vec![required_secret("verification_token", "notion")],
1195 ProviderRuntimeMetadata::Builtin {
1196 connector: "notion".to_string(),
1197 default_signature_variant: Some("notion".to_string()),
1198 },
1199 );
1200 metadata.outbound_methods = vec![
1201 outbound_method("get_page"),
1202 outbound_method("update_page"),
1203 outbound_method("append_blocks"),
1204 outbound_method("query_database"),
1205 outbound_method("search"),
1206 outbound_method("create_comment"),
1207 outbound_method("api_call"),
1208 ];
1209 metadata
1210 },
1211 normalize: notion_payload,
1212 }),
1213 Arc::new(BuiltinProviderSchema {
1214 provider_id: "cron",
1215 harn_schema_name: "CronEventPayload",
1216 metadata: provider_metadata_entry(
1217 "cron",
1218 &["cron"],
1219 "CronEventPayload",
1220 &[],
1221 SignatureVerificationMetadata::None,
1222 Vec::new(),
1223 ProviderRuntimeMetadata::Builtin {
1224 connector: "cron".to_string(),
1225 default_signature_variant: None,
1226 },
1227 ),
1228 normalize: cron_payload,
1229 }),
1230 Arc::new(BuiltinProviderSchema {
1231 provider_id: "webhook",
1232 harn_schema_name: "GenericWebhookPayload",
1233 metadata: provider_metadata_entry(
1234 "webhook",
1235 &["webhook"],
1236 "GenericWebhookPayload",
1237 &[],
1238 hmac_signature_metadata(
1239 "standard",
1240 "webhook-signature",
1241 Some("webhook-timestamp"),
1242 Some("webhook-id"),
1243 Some(300),
1244 "base64",
1245 ),
1246 vec![required_secret("signing_secret", "webhook")],
1247 ProviderRuntimeMetadata::Builtin {
1248 connector: "webhook".to_string(),
1249 default_signature_variant: Some("standard".to_string()),
1250 },
1251 ),
1252 normalize: webhook_payload,
1253 }),
1254 Arc::new(BuiltinProviderSchema {
1255 provider_id: "a2a-push",
1256 harn_schema_name: "A2aPushPayload",
1257 metadata: provider_metadata_entry(
1258 "a2a-push",
1259 &["a2a-push"],
1260 "A2aPushPayload",
1261 &[],
1262 SignatureVerificationMetadata::None,
1263 Vec::new(),
1264 ProviderRuntimeMetadata::Placeholder,
1265 ),
1266 normalize: a2a_push_payload,
1267 }),
1268 ]
1269}
1270
1271fn github_payload(
1272 kind: &str,
1273 headers: &BTreeMap<String, String>,
1274 raw: JsonValue,
1275) -> ProviderPayload {
1276 let common = GitHubEventCommon {
1277 event: kind.to_string(),
1278 action: raw
1279 .get("action")
1280 .and_then(JsonValue::as_str)
1281 .map(ToString::to_string),
1282 delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1283 installation_id: raw
1284 .get("installation")
1285 .and_then(|value| value.get("id"))
1286 .and_then(JsonValue::as_i64),
1287 raw: raw.clone(),
1288 };
1289 let payload = match kind {
1290 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1291 common,
1292 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1293 }),
1294 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1295 common,
1296 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1297 }),
1298 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1299 common,
1300 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1301 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1302 }),
1303 "pull_request_review" => {
1304 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1305 common,
1306 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1307 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1308 })
1309 }
1310 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1311 common,
1312 commits: raw
1313 .get("commits")
1314 .and_then(JsonValue::as_array)
1315 .cloned()
1316 .unwrap_or_default(),
1317 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1318 }),
1319 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1320 common,
1321 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1322 }),
1323 _ => GitHubEventPayload::Other(common),
1324 };
1325 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1326}
1327
1328fn slack_payload(
1329 kind: &str,
1330 _headers: &BTreeMap<String, String>,
1331 raw: JsonValue,
1332) -> ProviderPayload {
1333 let event = raw.get("event");
1334 let common = SlackEventCommon {
1335 event: kind.to_string(),
1336 event_id: raw
1337 .get("event_id")
1338 .and_then(JsonValue::as_str)
1339 .map(ToString::to_string),
1340 api_app_id: raw
1341 .get("api_app_id")
1342 .and_then(JsonValue::as_str)
1343 .map(ToString::to_string),
1344 team_id: raw
1345 .get("team_id")
1346 .and_then(JsonValue::as_str)
1347 .map(ToString::to_string),
1348 channel_id: slack_channel_id(event),
1349 user_id: slack_user_id(event),
1350 event_ts: event
1351 .and_then(|value| value.get("event_ts"))
1352 .and_then(JsonValue::as_str)
1353 .map(ToString::to_string),
1354 raw: raw.clone(),
1355 };
1356 let payload = match kind {
1357 kind if kind == "message" || kind.starts_with("message.") => {
1358 SlackEventPayload::Message(SlackMessageEventPayload {
1359 subtype: event
1360 .and_then(|value| value.get("subtype"))
1361 .and_then(JsonValue::as_str)
1362 .map(ToString::to_string),
1363 channel_type: event
1364 .and_then(|value| value.get("channel_type"))
1365 .and_then(JsonValue::as_str)
1366 .map(ToString::to_string),
1367 channel: event
1368 .and_then(|value| value.get("channel"))
1369 .and_then(JsonValue::as_str)
1370 .map(ToString::to_string),
1371 user: event
1372 .and_then(|value| value.get("user"))
1373 .and_then(JsonValue::as_str)
1374 .map(ToString::to_string),
1375 text: event
1376 .and_then(|value| value.get("text"))
1377 .and_then(JsonValue::as_str)
1378 .map(ToString::to_string),
1379 ts: event
1380 .and_then(|value| value.get("ts"))
1381 .and_then(JsonValue::as_str)
1382 .map(ToString::to_string),
1383 thread_ts: event
1384 .and_then(|value| value.get("thread_ts"))
1385 .and_then(JsonValue::as_str)
1386 .map(ToString::to_string),
1387 common,
1388 })
1389 }
1390 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1391 channel: event
1392 .and_then(|value| value.get("channel"))
1393 .and_then(JsonValue::as_str)
1394 .map(ToString::to_string),
1395 user: event
1396 .and_then(|value| value.get("user"))
1397 .and_then(JsonValue::as_str)
1398 .map(ToString::to_string),
1399 text: event
1400 .and_then(|value| value.get("text"))
1401 .and_then(JsonValue::as_str)
1402 .map(ToString::to_string),
1403 ts: event
1404 .and_then(|value| value.get("ts"))
1405 .and_then(JsonValue::as_str)
1406 .map(ToString::to_string),
1407 thread_ts: event
1408 .and_then(|value| value.get("thread_ts"))
1409 .and_then(JsonValue::as_str)
1410 .map(ToString::to_string),
1411 common,
1412 }),
1413 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1414 reaction: event
1415 .and_then(|value| value.get("reaction"))
1416 .and_then(JsonValue::as_str)
1417 .map(ToString::to_string),
1418 item_user: event
1419 .and_then(|value| value.get("item_user"))
1420 .and_then(JsonValue::as_str)
1421 .map(ToString::to_string),
1422 item: event
1423 .and_then(|value| value.get("item"))
1424 .cloned()
1425 .unwrap_or(JsonValue::Null),
1426 common,
1427 }),
1428 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1429 user: event
1430 .and_then(|value| value.get("user"))
1431 .and_then(JsonValue::as_str)
1432 .map(ToString::to_string),
1433 channel: event
1434 .and_then(|value| value.get("channel"))
1435 .and_then(JsonValue::as_str)
1436 .map(ToString::to_string),
1437 tab: event
1438 .and_then(|value| value.get("tab"))
1439 .and_then(JsonValue::as_str)
1440 .map(ToString::to_string),
1441 view: event
1442 .and_then(|value| value.get("view"))
1443 .cloned()
1444 .unwrap_or(JsonValue::Null),
1445 common,
1446 }),
1447 "assistant_thread_started" => {
1448 let assistant_thread = event
1449 .and_then(|value| value.get("assistant_thread"))
1450 .cloned()
1451 .unwrap_or(JsonValue::Null);
1452 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1453 thread_ts: assistant_thread
1454 .get("thread_ts")
1455 .and_then(JsonValue::as_str)
1456 .map(ToString::to_string),
1457 context: assistant_thread
1458 .get("context")
1459 .cloned()
1460 .unwrap_or(JsonValue::Null),
1461 assistant_thread,
1462 common,
1463 })
1464 }
1465 _ => SlackEventPayload::Other(common),
1466 };
1467 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1468}
1469
1470fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1471 event
1472 .and_then(|value| value.get("channel"))
1473 .and_then(JsonValue::as_str)
1474 .map(ToString::to_string)
1475 .or_else(|| {
1476 event
1477 .and_then(|value| value.get("item"))
1478 .and_then(|value| value.get("channel"))
1479 .and_then(JsonValue::as_str)
1480 .map(ToString::to_string)
1481 })
1482 .or_else(|| {
1483 event
1484 .and_then(|value| value.get("channel"))
1485 .and_then(|value| value.get("id"))
1486 .and_then(JsonValue::as_str)
1487 .map(ToString::to_string)
1488 })
1489 .or_else(|| {
1490 event
1491 .and_then(|value| value.get("assistant_thread"))
1492 .and_then(|value| value.get("channel_id"))
1493 .and_then(JsonValue::as_str)
1494 .map(ToString::to_string)
1495 })
1496}
1497
1498fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1499 event
1500 .and_then(|value| value.get("user"))
1501 .and_then(JsonValue::as_str)
1502 .map(ToString::to_string)
1503 .or_else(|| {
1504 event
1505 .and_then(|value| value.get("user"))
1506 .and_then(|value| value.get("id"))
1507 .and_then(JsonValue::as_str)
1508 .map(ToString::to_string)
1509 })
1510 .or_else(|| {
1511 event
1512 .and_then(|value| value.get("item_user"))
1513 .and_then(JsonValue::as_str)
1514 .map(ToString::to_string)
1515 })
1516 .or_else(|| {
1517 event
1518 .and_then(|value| value.get("assistant_thread"))
1519 .and_then(|value| value.get("user_id"))
1520 .and_then(JsonValue::as_str)
1521 .map(ToString::to_string)
1522 })
1523}
1524
1525fn linear_payload(
1526 _kind: &str,
1527 headers: &BTreeMap<String, String>,
1528 raw: JsonValue,
1529) -> ProviderPayload {
1530 let common = linear_event_common(headers, &raw);
1531 let event = common.event.clone();
1532 let payload = match event.as_str() {
1533 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1534 common,
1535 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1536 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1537 }),
1538 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1539 common,
1540 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1541 }),
1542 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1543 common,
1544 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1545 }),
1546 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1547 common,
1548 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1549 }),
1550 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1551 common,
1552 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1553 }),
1554 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1555 common,
1556 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1557 }),
1558 "customer_request" => {
1559 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1560 common,
1561 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1562 })
1563 }
1564 _ => LinearEventPayload::Other(common),
1565 };
1566 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1567}
1568
1569fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1570 LinearEventCommon {
1571 event: linear_event_name(
1572 raw.get("type")
1573 .and_then(JsonValue::as_str)
1574 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1575 ),
1576 action: raw
1577 .get("action")
1578 .and_then(JsonValue::as_str)
1579 .map(ToString::to_string),
1580 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1581 organization_id: raw
1582 .get("organizationId")
1583 .and_then(JsonValue::as_str)
1584 .map(ToString::to_string),
1585 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1586 webhook_id: raw
1587 .get("webhookId")
1588 .and_then(JsonValue::as_str)
1589 .map(ToString::to_string),
1590 url: raw
1591 .get("url")
1592 .and_then(JsonValue::as_str)
1593 .map(ToString::to_string),
1594 created_at: raw
1595 .get("createdAt")
1596 .and_then(JsonValue::as_str)
1597 .map(ToString::to_string),
1598 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1599 raw: raw.clone(),
1600 }
1601}
1602
1603fn linear_event_name(raw_type: Option<&str>) -> String {
1604 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1605 "issue" => "issue".to_string(),
1606 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1607 "issuelabel" | "issue_label" => "issue_label".to_string(),
1608 "project" | "projectupdate" | "project_update" => "project".to_string(),
1609 "cycle" => "cycle".to_string(),
1610 "customer" => "customer".to_string(),
1611 "customerrequest" | "customer_request" => "customer_request".to_string(),
1612 other if !other.is_empty() => other.to_string(),
1613 _ => "other".to_string(),
1614 }
1615}
1616
1617fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1618 let Some(JsonValue::Object(fields)) = updated_from else {
1619 return Vec::new();
1620 };
1621 let mut changes = Vec::new();
1622 for (field, previous) in fields {
1623 let change = match field.as_str() {
1624 "title" => LinearIssueChange::Title {
1625 previous: previous.as_str().map(ToString::to_string),
1626 },
1627 "description" => LinearIssueChange::Description {
1628 previous: previous.as_str().map(ToString::to_string),
1629 },
1630 "priority" => LinearIssueChange::Priority {
1631 previous: parse_json_i64ish(previous),
1632 },
1633 "estimate" => LinearIssueChange::Estimate {
1634 previous: parse_json_i64ish(previous),
1635 },
1636 "stateId" => LinearIssueChange::StateId {
1637 previous: previous.as_str().map(ToString::to_string),
1638 },
1639 "teamId" => LinearIssueChange::TeamId {
1640 previous: previous.as_str().map(ToString::to_string),
1641 },
1642 "assigneeId" => LinearIssueChange::AssigneeId {
1643 previous: previous.as_str().map(ToString::to_string),
1644 },
1645 "projectId" => LinearIssueChange::ProjectId {
1646 previous: previous.as_str().map(ToString::to_string),
1647 },
1648 "cycleId" => LinearIssueChange::CycleId {
1649 previous: previous.as_str().map(ToString::to_string),
1650 },
1651 "dueDate" => LinearIssueChange::DueDate {
1652 previous: previous.as_str().map(ToString::to_string),
1653 },
1654 "parentId" => LinearIssueChange::ParentId {
1655 previous: previous.as_str().map(ToString::to_string),
1656 },
1657 "sortOrder" => LinearIssueChange::SortOrder {
1658 previous: previous.as_f64(),
1659 },
1660 "labelIds" => LinearIssueChange::LabelIds {
1661 previous: parse_string_array(previous),
1662 },
1663 "completedAt" => LinearIssueChange::CompletedAt {
1664 previous: previous.as_str().map(ToString::to_string),
1665 },
1666 _ => LinearIssueChange::Other {
1667 field: field.clone(),
1668 previous: previous.clone(),
1669 },
1670 };
1671 changes.push(change);
1672 }
1673 changes
1674}
1675
1676fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1677 value
1678 .as_i64()
1679 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1680 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1681}
1682
1683fn parse_string_array(value: &JsonValue) -> Vec<String> {
1684 let Some(array) = value.as_array() else {
1685 return Vec::new();
1686 };
1687 array
1688 .iter()
1689 .filter_map(|entry| {
1690 entry.as_str().map(ToString::to_string).or_else(|| {
1691 entry
1692 .get("id")
1693 .and_then(JsonValue::as_str)
1694 .map(ToString::to_string)
1695 })
1696 })
1697 .collect()
1698}
1699
1700fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1701 headers
1702 .iter()
1703 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1704 .map(|(_, value)| value.as_str())
1705}
1706
1707fn notion_payload(
1708 kind: &str,
1709 headers: &BTreeMap<String, String>,
1710 raw: JsonValue,
1711) -> ProviderPayload {
1712 let workspace_id = raw
1713 .get("workspace_id")
1714 .and_then(JsonValue::as_str)
1715 .map(ToString::to_string);
1716 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1717 event: kind.to_string(),
1718 workspace_id,
1719 request_id: headers
1720 .get("request-id")
1721 .cloned()
1722 .or_else(|| headers.get("x-request-id").cloned()),
1723 subscription_id: raw
1724 .get("subscription_id")
1725 .and_then(JsonValue::as_str)
1726 .map(ToString::to_string),
1727 integration_id: raw
1728 .get("integration_id")
1729 .and_then(JsonValue::as_str)
1730 .map(ToString::to_string),
1731 attempt_number: raw
1732 .get("attempt_number")
1733 .and_then(JsonValue::as_u64)
1734 .and_then(|value| u32::try_from(value).ok()),
1735 entity_id: raw
1736 .get("entity")
1737 .and_then(|value| value.get("id"))
1738 .and_then(JsonValue::as_str)
1739 .map(ToString::to_string),
1740 entity_type: raw
1741 .get("entity")
1742 .and_then(|value| value.get("type"))
1743 .and_then(JsonValue::as_str)
1744 .map(ToString::to_string),
1745 api_version: raw
1746 .get("api_version")
1747 .and_then(JsonValue::as_str)
1748 .map(ToString::to_string),
1749 verification_token: raw
1750 .get("verification_token")
1751 .and_then(JsonValue::as_str)
1752 .map(ToString::to_string),
1753 polled: None,
1754 raw,
1755 })))
1756}
1757
1758fn cron_payload(
1759 _kind: &str,
1760 _headers: &BTreeMap<String, String>,
1761 raw: JsonValue,
1762) -> ProviderPayload {
1763 let cron_id = raw
1764 .get("cron_id")
1765 .and_then(JsonValue::as_str)
1766 .map(ToString::to_string);
1767 let schedule = raw
1768 .get("schedule")
1769 .and_then(JsonValue::as_str)
1770 .map(ToString::to_string);
1771 let tick_at = raw
1772 .get("tick_at")
1773 .and_then(JsonValue::as_str)
1774 .and_then(parse_rfc3339)
1775 .unwrap_or_else(OffsetDateTime::now_utc);
1776 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1777 cron_id,
1778 schedule,
1779 tick_at,
1780 raw,
1781 }))
1782}
1783
1784fn webhook_payload(
1785 _kind: &str,
1786 headers: &BTreeMap<String, String>,
1787 raw: JsonValue,
1788) -> ProviderPayload {
1789 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1790 source: headers.get("X-Webhook-Source").cloned(),
1791 content_type: headers.get("Content-Type").cloned(),
1792 raw,
1793 }))
1794}
1795
1796fn a2a_push_payload(
1797 _kind: &str,
1798 _headers: &BTreeMap<String, String>,
1799 raw: JsonValue,
1800) -> ProviderPayload {
1801 let task_id = raw
1802 .get("task_id")
1803 .and_then(JsonValue::as_str)
1804 .map(ToString::to_string);
1805 let sender = raw
1806 .get("sender")
1807 .and_then(JsonValue::as_str)
1808 .map(ToString::to_string);
1809 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1810 task_id,
1811 sender,
1812 raw,
1813 }))
1814}
1815
1816fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1817 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822 use super::*;
1823
1824 fn sample_headers() -> BTreeMap<String, String> {
1825 BTreeMap::from([
1826 ("Authorization".to_string(), "Bearer secret".to_string()),
1827 ("Cookie".to_string(), "session=abc".to_string()),
1828 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
1829 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1830 ("X-GitHub-Event".to_string(), "issues".to_string()),
1831 ("X-Webhook-Token".to_string(), "token".to_string()),
1832 ])
1833 }
1834
1835 #[test]
1836 fn default_redaction_policy_keeps_safe_headers() {
1837 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1838 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
1839 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
1840 assert_eq!(
1841 redacted.get("Authorization").unwrap(),
1842 REDACTED_HEADER_VALUE
1843 );
1844 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
1845 assert_eq!(
1846 redacted.get("X-Webhook-Token").unwrap(),
1847 REDACTED_HEADER_VALUE
1848 );
1849 }
1850
1851 #[test]
1852 fn provider_catalog_rejects_duplicates() {
1853 let mut catalog = ProviderCatalog::default();
1854 catalog
1855 .register(Arc::new(BuiltinProviderSchema {
1856 provider_id: "github",
1857 harn_schema_name: "GitHubEventPayload",
1858 metadata: provider_metadata_entry(
1859 "github",
1860 &["webhook"],
1861 "GitHubEventPayload",
1862 &[],
1863 SignatureVerificationMetadata::None,
1864 Vec::new(),
1865 ProviderRuntimeMetadata::Placeholder,
1866 ),
1867 normalize: github_payload,
1868 }))
1869 .unwrap();
1870 let error = catalog
1871 .register(Arc::new(BuiltinProviderSchema {
1872 provider_id: "github",
1873 harn_schema_name: "GitHubEventPayload",
1874 metadata: provider_metadata_entry(
1875 "github",
1876 &["webhook"],
1877 "GitHubEventPayload",
1878 &[],
1879 SignatureVerificationMetadata::None,
1880 Vec::new(),
1881 ProviderRuntimeMetadata::Placeholder,
1882 ),
1883 normalize: github_payload,
1884 }))
1885 .unwrap_err();
1886 assert_eq!(
1887 error,
1888 ProviderCatalogError::DuplicateProvider("github".to_string())
1889 );
1890 }
1891
1892 #[test]
1893 fn registered_provider_metadata_marks_builtin_connectors() {
1894 let entries = registered_provider_metadata();
1895 let builtin: Vec<&ProviderMetadata> = entries
1896 .iter()
1897 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
1898 .collect();
1899
1900 assert_eq!(builtin.len(), 6);
1901 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
1902 assert!(builtin.iter().any(|entry| entry.provider == "github"));
1903 assert!(builtin.iter().any(|entry| entry.provider == "linear"));
1904 assert!(builtin.iter().any(|entry| entry.provider == "notion"));
1905 assert!(builtin.iter().any(|entry| entry.provider == "slack"));
1906 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
1907 }
1908
1909 #[test]
1910 fn trigger_event_round_trip_is_stable() {
1911 let provider = ProviderId::from("github");
1912 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1913 let payload = ProviderPayload::normalize(
1914 &provider,
1915 "issues",
1916 &sample_headers(),
1917 serde_json::json!({
1918 "action": "opened",
1919 "installation": {"id": 42},
1920 "issue": {"number": 99}
1921 }),
1922 )
1923 .unwrap();
1924 let event = TriggerEvent {
1925 id: TriggerEventId("trigger_evt_fixed".to_string()),
1926 provider,
1927 kind: "issues".to_string(),
1928 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
1929 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
1930 dedupe_key: "delivery-123".to_string(),
1931 trace_id: TraceId("trace_fixed".to_string()),
1932 tenant_id: Some(TenantId("tenant_1".to_string())),
1933 headers,
1934 provider_payload: payload,
1935 signature_status: SignatureStatus::Verified,
1936 dedupe_claimed: false,
1937 batch: None,
1938 raw_body: Some(vec![0, 159, 255, 10]),
1939 };
1940
1941 let once = serde_json::to_value(&event).unwrap();
1942 assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
1943 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
1944 let twice = serde_json::to_value(&decoded).unwrap();
1945 assert_eq!(decoded, event);
1946 assert_eq!(once, twice);
1947 }
1948
1949 #[test]
1950 fn unknown_provider_errors() {
1951 let error = ProviderPayload::normalize(
1952 &ProviderId::from("custom-provider"),
1953 "thing.happened",
1954 &BTreeMap::new(),
1955 serde_json::json!({"ok": true}),
1956 )
1957 .unwrap_err();
1958 assert_eq!(
1959 error,
1960 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
1961 );
1962 }
1963}