1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14 value: &Option<Vec<u8>>,
15 serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18 S: Serializer,
19{
20 use base64::Engine;
21
22 match value {
23 Some(bytes) => {
24 serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25 }
26 None => serializer.serialize_none(),
27 }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32 D: Deserializer<'de>,
33{
34 use base64::Engine;
35
36 let encoded = Option::<String>::deserialize(deserializer)?;
37 encoded
38 .map(|text| {
39 base64::engine::general_purpose::STANDARD
40 .decode(text.as_bytes())
41 .map_err(serde::de::Error::custom)
42 })
43 .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51 pub fn new() -> Self {
52 Self(format!("trigger_evt_{}", Uuid::now_v7()))
53 }
54}
55
56impl Default for TriggerEventId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67 pub fn new(value: impl Into<String>) -> Self {
68 Self(value.into())
69 }
70
71 pub fn as_str(&self) -> &str {
72 self.0.as_str()
73 }
74}
75
76impl From<&str> for ProviderId {
77 fn from(value: &str) -> Self {
78 Self::new(value)
79 }
80}
81
82impl From<String> for ProviderId {
83 fn from(value: String) -> Self {
84 Self::new(value)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93 pub fn new() -> Self {
94 Self(format!("trace_{}", Uuid::now_v7()))
95 }
96}
97
98impl Default for TraceId {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117 Verified,
118 Unsigned,
119 Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124 pub event: String,
125 pub action: Option<String>,
126 pub delivery_id: Option<String>,
127 pub installation_id: Option<i64>,
128 pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133 #[serde(flatten)]
134 pub common: GitHubEventCommon,
135 pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140 #[serde(flatten)]
141 pub common: GitHubEventCommon,
142 pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147 #[serde(flatten)]
148 pub common: GitHubEventCommon,
149 pub issue: JsonValue,
150 pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155 #[serde(flatten)]
156 pub common: GitHubEventCommon,
157 pub pull_request: JsonValue,
158 pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163 #[serde(flatten)]
164 pub common: GitHubEventCommon,
165 #[serde(default)]
166 pub commits: Vec<JsonValue>,
167 pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172 #[serde(flatten)]
173 pub common: GitHubEventCommon,
174 pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179 #[serde(flatten)]
180 pub common: GitHubEventCommon,
181 pub deployment_status: JsonValue,
182 pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187 #[serde(flatten)]
188 pub common: GitHubEventCommon,
189 pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195 Issues(GitHubIssuesEventPayload),
196 PullRequest(GitHubPullRequestEventPayload),
197 IssueComment(GitHubIssueCommentEventPayload),
198 PullRequestReview(GitHubPullRequestReviewEventPayload),
199 Push(GitHubPushEventPayload),
200 WorkflowRun(GitHubWorkflowRunEventPayload),
201 DeploymentStatus(GitHubDeploymentStatusEventPayload),
202 CheckRun(GitHubCheckRunEventPayload),
203 Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208 pub event: String,
209 pub event_id: Option<String>,
210 pub api_app_id: Option<String>,
211 pub team_id: Option<String>,
212 pub channel_id: Option<String>,
213 pub user_id: Option<String>,
214 pub event_ts: Option<String>,
215 pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220 #[serde(flatten)]
221 pub common: SlackEventCommon,
222 pub subtype: Option<String>,
223 pub channel_type: Option<String>,
224 pub channel: Option<String>,
225 pub user: Option<String>,
226 pub text: Option<String>,
227 pub ts: Option<String>,
228 pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233 #[serde(flatten)]
234 pub common: SlackEventCommon,
235 pub channel: Option<String>,
236 pub user: Option<String>,
237 pub text: Option<String>,
238 pub ts: Option<String>,
239 pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244 #[serde(flatten)]
245 pub common: SlackEventCommon,
246 pub reaction: Option<String>,
247 pub item_user: Option<String>,
248 pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253 #[serde(flatten)]
254 pub common: SlackEventCommon,
255 pub user: Option<String>,
256 pub channel: Option<String>,
257 pub tab: Option<String>,
258 pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263 #[serde(flatten)]
264 pub common: SlackEventCommon,
265 pub assistant_thread: JsonValue,
266 pub thread_ts: Option<String>,
267 pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273 Message(SlackMessageEventPayload),
274 AppMention(SlackAppMentionEventPayload),
275 ReactionAdded(SlackReactionAddedEventPayload),
276 AppHomeOpened(SlackAppHomeOpenedEventPayload),
277 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278 Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283 pub event: String,
284 pub action: Option<String>,
285 pub delivery_id: Option<String>,
286 pub organization_id: Option<String>,
287 pub webhook_timestamp: Option<i64>,
288 pub webhook_id: Option<String>,
289 pub url: Option<String>,
290 pub created_at: Option<String>,
291 pub actor: JsonValue,
292 pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297 Title { previous: Option<String> },
298 Description { previous: Option<String> },
299 Priority { previous: Option<i64> },
300 Estimate { previous: Option<i64> },
301 StateId { previous: Option<String> },
302 TeamId { previous: Option<String> },
303 AssigneeId { previous: Option<String> },
304 ProjectId { previous: Option<String> },
305 CycleId { previous: Option<String> },
306 DueDate { previous: Option<String> },
307 ParentId { previous: Option<String> },
308 SortOrder { previous: Option<f64> },
309 LabelIds { previous: Vec<String> },
310 CompletedAt { previous: Option<String> },
311 Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316 where
317 S: Serializer,
318 {
319 let value = match self {
320 Self::Title { previous } => {
321 serde_json::json!({ "field_name": "title", "previous": previous })
322 }
323 Self::Description { previous } => {
324 serde_json::json!({ "field_name": "description", "previous": previous })
325 }
326 Self::Priority { previous } => {
327 serde_json::json!({ "field_name": "priority", "previous": previous })
328 }
329 Self::Estimate { previous } => {
330 serde_json::json!({ "field_name": "estimate", "previous": previous })
331 }
332 Self::StateId { previous } => {
333 serde_json::json!({ "field_name": "state_id", "previous": previous })
334 }
335 Self::TeamId { previous } => {
336 serde_json::json!({ "field_name": "team_id", "previous": previous })
337 }
338 Self::AssigneeId { previous } => {
339 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340 }
341 Self::ProjectId { previous } => {
342 serde_json::json!({ "field_name": "project_id", "previous": previous })
343 }
344 Self::CycleId { previous } => {
345 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346 }
347 Self::DueDate { previous } => {
348 serde_json::json!({ "field_name": "due_date", "previous": previous })
349 }
350 Self::ParentId { previous } => {
351 serde_json::json!({ "field_name": "parent_id", "previous": previous })
352 }
353 Self::SortOrder { previous } => {
354 serde_json::json!({ "field_name": "sort_order", "previous": previous })
355 }
356 Self::LabelIds { previous } => {
357 serde_json::json!({ "field_name": "label_ids", "previous": previous })
358 }
359 Self::CompletedAt { previous } => {
360 serde_json::json!({ "field_name": "completed_at", "previous": previous })
361 }
362 Self::Other { field, previous } => {
363 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364 }
365 };
366 value.serialize(serializer)
367 }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372 where
373 D: Deserializer<'de>,
374 {
375 let value = JsonValue::deserialize(deserializer)?;
376 let field_name = value
377 .get("field_name")
378 .and_then(JsonValue::as_str)
379 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381 Ok(match field_name {
382 "title" => Self::Title {
383 previous: previous.as_str().map(ToString::to_string),
384 },
385 "description" => Self::Description {
386 previous: previous.as_str().map(ToString::to_string),
387 },
388 "priority" => Self::Priority {
389 previous: parse_json_i64ish(&previous),
390 },
391 "estimate" => Self::Estimate {
392 previous: parse_json_i64ish(&previous),
393 },
394 "state_id" => Self::StateId {
395 previous: previous.as_str().map(ToString::to_string),
396 },
397 "team_id" => Self::TeamId {
398 previous: previous.as_str().map(ToString::to_string),
399 },
400 "assignee_id" => Self::AssigneeId {
401 previous: previous.as_str().map(ToString::to_string),
402 },
403 "project_id" => Self::ProjectId {
404 previous: previous.as_str().map(ToString::to_string),
405 },
406 "cycle_id" => Self::CycleId {
407 previous: previous.as_str().map(ToString::to_string),
408 },
409 "due_date" => Self::DueDate {
410 previous: previous.as_str().map(ToString::to_string),
411 },
412 "parent_id" => Self::ParentId {
413 previous: previous.as_str().map(ToString::to_string),
414 },
415 "sort_order" => Self::SortOrder {
416 previous: previous.as_f64(),
417 },
418 "label_ids" => Self::LabelIds {
419 previous: parse_string_array(&previous),
420 },
421 "completed_at" => Self::CompletedAt {
422 previous: previous.as_str().map(ToString::to_string),
423 },
424 "other" => Self::Other {
425 field: value
426 .get("field")
427 .and_then(JsonValue::as_str)
428 .map(ToString::to_string)
429 .unwrap_or_else(|| "unknown".to_string()),
430 previous,
431 },
432 other => Self::Other {
433 field: other.to_string(),
434 previous,
435 },
436 })
437 }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442 #[serde(flatten)]
443 pub common: LinearEventCommon,
444 pub issue: JsonValue,
445 #[serde(default)]
446 pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451 #[serde(flatten)]
452 pub common: LinearEventCommon,
453 pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458 #[serde(flatten)]
459 pub common: LinearEventCommon,
460 pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465 #[serde(flatten)]
466 pub common: LinearEventCommon,
467 pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472 #[serde(flatten)]
473 pub common: LinearEventCommon,
474 pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479 #[serde(flatten)]
480 pub common: LinearEventCommon,
481 pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486 #[serde(flatten)]
487 pub common: LinearEventCommon,
488 pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494 Issue(LinearIssueEventPayload),
495 IssueComment(LinearIssueCommentEventPayload),
496 IssueLabel(LinearIssueLabelEventPayload),
497 Project(LinearProjectEventPayload),
498 Cycle(LinearCycleEventPayload),
499 Customer(LinearCustomerEventPayload),
500 CustomerRequest(LinearCustomerRequestEventPayload),
501 Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506 pub resource: String,
507 pub source_id: String,
508 pub entity_id: String,
509 pub high_water_mark: String,
510 pub before: Option<JsonValue>,
511 pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516 pub event: String,
517 pub workspace_id: Option<String>,
518 pub request_id: Option<String>,
519 pub subscription_id: Option<String>,
520 pub integration_id: Option<String>,
521 pub attempt_number: Option<u32>,
522 pub entity_id: Option<String>,
523 pub entity_type: Option<String>,
524 pub api_version: Option<String>,
525 pub verification_token: Option<String>,
526 pub polled: Option<NotionPolledChangeEvent>,
527 pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532 pub cron_id: Option<String>,
533 pub schedule: Option<String>,
534 #[serde(with = "time::serde::rfc3339")]
535 pub tick_at: OffsetDateTime,
536 pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541 pub source: Option<String>,
542 pub content_type: Option<String>,
543 pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548 pub task_id: Option<String>,
549 pub task_state: Option<String>,
550 pub artifact: Option<JsonValue>,
551 pub sender: Option<String>,
552 pub raw: JsonValue,
553 pub kind: String,
554}
555
556#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
557pub struct StreamEventPayload {
558 pub event: String,
559 pub source: Option<String>,
560 pub stream: Option<String>,
561 pub partition: Option<String>,
562 pub offset: Option<String>,
563 pub key: Option<String>,
564 pub timestamp: Option<String>,
565 #[serde(default)]
566 pub headers: BTreeMap<String, String>,
567 pub raw: JsonValue,
568}
569
570#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
571pub struct ExtensionProviderPayload {
572 pub provider: String,
573 pub schema_name: String,
574 pub raw: JsonValue,
575}
576
577#[allow(clippy::large_enum_variant)]
578#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
579#[serde(untagged)]
580pub enum ProviderPayload {
581 Known(KnownProviderPayload),
582 Extension(ExtensionProviderPayload),
583}
584
585impl ProviderPayload {
586 pub fn provider(&self) -> &str {
587 match self {
588 Self::Known(known) => known.provider(),
589 Self::Extension(payload) => payload.provider.as_str(),
590 }
591 }
592
593 pub fn normalize(
594 provider: &ProviderId,
595 kind: &str,
596 headers: &BTreeMap<String, String>,
597 raw: JsonValue,
598 ) -> Result<Self, ProviderCatalogError> {
599 provider_catalog()
600 .read()
601 .expect("provider catalog poisoned")
602 .normalize(provider, kind, headers, raw)
603 }
604}
605
606#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
607#[serde(tag = "provider")]
608pub enum KnownProviderPayload {
609 #[serde(rename = "github")]
610 GitHub(GitHubEventPayload),
611 #[serde(rename = "slack")]
612 Slack(Box<SlackEventPayload>),
613 #[serde(rename = "linear")]
614 Linear(LinearEventPayload),
615 #[serde(rename = "notion")]
616 Notion(Box<NotionEventPayload>),
617 #[serde(rename = "cron")]
618 Cron(CronEventPayload),
619 #[serde(rename = "webhook")]
620 Webhook(GenericWebhookPayload),
621 #[serde(rename = "a2a-push")]
622 A2aPush(A2aPushPayload),
623 #[serde(rename = "kafka")]
624 Kafka(StreamEventPayload),
625 #[serde(rename = "nats")]
626 Nats(StreamEventPayload),
627 #[serde(rename = "pulsar")]
628 Pulsar(StreamEventPayload),
629 #[serde(rename = "postgres-cdc")]
630 PostgresCdc(StreamEventPayload),
631 #[serde(rename = "email")]
632 Email(StreamEventPayload),
633 #[serde(rename = "websocket")]
634 Websocket(StreamEventPayload),
635}
636
637impl KnownProviderPayload {
638 pub fn provider(&self) -> &str {
639 match self {
640 Self::GitHub(_) => "github",
641 Self::Slack(_) => "slack",
642 Self::Linear(_) => "linear",
643 Self::Notion(_) => "notion",
644 Self::Cron(_) => "cron",
645 Self::Webhook(_) => "webhook",
646 Self::A2aPush(_) => "a2a-push",
647 Self::Kafka(_) => "kafka",
648 Self::Nats(_) => "nats",
649 Self::Pulsar(_) => "pulsar",
650 Self::PostgresCdc(_) => "postgres-cdc",
651 Self::Email(_) => "email",
652 Self::Websocket(_) => "websocket",
653 }
654 }
655}
656
657#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
658pub struct TriggerEvent {
659 pub id: TriggerEventId,
660 pub provider: ProviderId,
661 pub kind: String,
662 #[serde(with = "time::serde::rfc3339")]
663 pub received_at: OffsetDateTime,
664 #[serde(with = "time::serde::rfc3339::option")]
665 pub occurred_at: Option<OffsetDateTime>,
666 pub dedupe_key: String,
667 pub trace_id: TraceId,
668 pub tenant_id: Option<TenantId>,
669 pub headers: BTreeMap<String, String>,
670 #[serde(default, skip_serializing_if = "Option::is_none")]
671 pub batch: Option<Vec<JsonValue>>,
672 #[serde(
673 default,
674 skip_serializing_if = "Option::is_none",
675 serialize_with = "serialize_optional_bytes_b64",
676 deserialize_with = "deserialize_optional_bytes_b64"
677 )]
678 pub raw_body: Option<Vec<u8>>,
679 pub provider_payload: ProviderPayload,
680 pub signature_status: SignatureStatus,
681 #[serde(skip)]
682 pub dedupe_claimed: bool,
683}
684
685impl TriggerEvent {
686 #[allow(clippy::too_many_arguments)]
687 pub fn new(
688 provider: ProviderId,
689 kind: impl Into<String>,
690 occurred_at: Option<OffsetDateTime>,
691 dedupe_key: impl Into<String>,
692 tenant_id: Option<TenantId>,
693 headers: BTreeMap<String, String>,
694 provider_payload: ProviderPayload,
695 signature_status: SignatureStatus,
696 ) -> Self {
697 Self {
698 id: TriggerEventId::new(),
699 provider,
700 kind: kind.into(),
701 received_at: clock::now_utc(),
702 occurred_at,
703 dedupe_key: dedupe_key.into(),
704 trace_id: TraceId::new(),
705 tenant_id,
706 headers,
707 batch: None,
708 raw_body: None,
709 provider_payload,
710 signature_status,
711 dedupe_claimed: false,
712 }
713 }
714
715 pub fn dedupe_claimed(&self) -> bool {
716 self.dedupe_claimed
717 }
718
719 pub fn mark_dedupe_claimed(&mut self) {
720 self.dedupe_claimed = true;
721 }
722}
723
724#[derive(Clone, Debug, PartialEq, Eq)]
725pub struct HeaderRedactionPolicy {
726 safe_exact_names: BTreeSet<String>,
727}
728
729impl HeaderRedactionPolicy {
730 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
731 self.safe_exact_names
732 .insert(name.into().to_ascii_lowercase());
733 self
734 }
735
736 fn should_keep(&self, name: &str) -> bool {
737 let lower = name.to_ascii_lowercase();
738 if self.safe_exact_names.contains(lower.as_str()) {
739 return true;
740 }
741 matches!(
742 lower.as_str(),
743 "user-agent"
744 | "request-id"
745 | "x-request-id"
746 | "x-correlation-id"
747 | "content-type"
748 | "content-length"
749 | "x-github-event"
750 | "x-github-delivery"
751 | "x-github-hook-id"
752 | "x-hub-signature-256"
753 | "x-slack-request-timestamp"
754 | "x-slack-signature"
755 | "x-linear-signature"
756 | "x-notion-signature"
757 | "x-a2a-signature"
758 | "x-a2a-delivery"
759 ) || lower.ends_with("-event")
760 || lower.ends_with("-delivery")
761 || lower.contains("timestamp")
762 || lower.contains("request-id")
763 }
764
765 fn should_redact(&self, name: &str) -> bool {
766 let lower = name.to_ascii_lowercase();
767 if self.should_keep(lower.as_str()) {
768 return false;
769 }
770 lower.contains("authorization")
771 || lower.contains("cookie")
772 || lower.contains("secret")
773 || lower.contains("token")
774 || lower.contains("key")
775 }
776}
777
778impl Default for HeaderRedactionPolicy {
779 fn default() -> Self {
780 Self {
781 safe_exact_names: BTreeSet::from([
782 "content-length".to_string(),
783 "content-type".to_string(),
784 "request-id".to_string(),
785 "user-agent".to_string(),
786 "x-a2a-delivery".to_string(),
787 "x-a2a-signature".to_string(),
788 "x-correlation-id".to_string(),
789 "x-github-delivery".to_string(),
790 "x-github-event".to_string(),
791 "x-github-hook-id".to_string(),
792 "x-hub-signature-256".to_string(),
793 "x-linear-signature".to_string(),
794 "x-notion-signature".to_string(),
795 "x-request-id".to_string(),
796 "x-slack-request-timestamp".to_string(),
797 "x-slack-signature".to_string(),
798 ]),
799 }
800 }
801}
802
803pub fn redact_headers(
804 headers: &BTreeMap<String, String>,
805 policy: &HeaderRedactionPolicy,
806) -> BTreeMap<String, String> {
807 headers
808 .iter()
809 .map(|(name, value)| {
810 if policy.should_redact(name) {
811 (name.clone(), REDACTED_HEADER_VALUE.to_string())
812 } else {
813 (name.clone(), value.clone())
814 }
815 })
816 .collect()
817}
818
819#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
820pub struct ProviderSecretRequirement {
821 pub name: String,
822 pub required: bool,
823 pub namespace: String,
824}
825
826#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
827pub struct ProviderOutboundMethod {
828 pub name: String,
829}
830
831#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
832#[serde(tag = "kind", rename_all = "snake_case")]
833pub enum SignatureVerificationMetadata {
834 #[default]
835 None,
836 Hmac {
837 variant: String,
838 raw_body: bool,
839 signature_header: String,
840 timestamp_header: Option<String>,
841 id_header: Option<String>,
842 default_tolerance_secs: Option<i64>,
843 digest: String,
844 encoding: String,
845 },
846}
847
848#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
849#[serde(tag = "kind", rename_all = "snake_case")]
850pub enum ProviderRuntimeMetadata {
851 Builtin {
852 connector: String,
853 default_signature_variant: Option<String>,
854 },
855 #[default]
856 Placeholder,
857}
858
859#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
860pub struct ProviderMetadata {
861 pub provider: String,
862 #[serde(default)]
863 pub kinds: Vec<String>,
864 pub schema_name: String,
865 #[serde(default)]
866 pub outbound_methods: Vec<ProviderOutboundMethod>,
867 #[serde(default)]
868 pub secret_requirements: Vec<ProviderSecretRequirement>,
869 #[serde(default)]
870 pub signature_verification: SignatureVerificationMetadata,
871 #[serde(default)]
872 pub runtime: ProviderRuntimeMetadata,
873}
874
875impl ProviderMetadata {
876 pub fn supports_kind(&self, kind: &str) -> bool {
877 self.kinds.iter().any(|candidate| candidate == kind)
878 }
879
880 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
881 self.secret_requirements
882 .iter()
883 .filter(|requirement| requirement.required)
884 .map(|requirement| requirement.name.as_str())
885 }
886}
887
888pub trait ProviderSchema: Send + Sync {
889 fn provider_id(&self) -> &'static str;
890 fn harn_schema_name(&self) -> &'static str;
891 fn metadata(&self) -> ProviderMetadata {
892 ProviderMetadata {
893 provider: self.provider_id().to_string(),
894 schema_name: self.harn_schema_name().to_string(),
895 ..ProviderMetadata::default()
896 }
897 }
898 fn normalize(
899 &self,
900 kind: &str,
901 headers: &BTreeMap<String, String>,
902 raw: JsonValue,
903 ) -> Result<ProviderPayload, ProviderCatalogError>;
904}
905
906#[derive(Clone, Debug, PartialEq, Eq)]
907pub enum ProviderCatalogError {
908 DuplicateProvider(String),
909 UnknownProvider(String),
910}
911
912impl std::fmt::Display for ProviderCatalogError {
913 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914 match self {
915 Self::DuplicateProvider(provider) => {
916 write!(f, "provider `{provider}` is already registered")
917 }
918 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
919 }
920 }
921}
922
923impl std::error::Error for ProviderCatalogError {}
924
925#[derive(Clone, Default)]
926pub struct ProviderCatalog {
927 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
928}
929
930impl ProviderCatalog {
931 pub fn with_defaults() -> Self {
932 let mut catalog = Self::default();
933 for schema in default_provider_schemas() {
934 catalog
935 .register(schema)
936 .expect("default providers must register cleanly");
937 }
938 catalog
939 }
940
941 pub fn register(
942 &mut self,
943 schema: Arc<dyn ProviderSchema>,
944 ) -> Result<(), ProviderCatalogError> {
945 let provider = schema.provider_id().to_string();
946 if self.providers.contains_key(provider.as_str()) {
947 return Err(ProviderCatalogError::DuplicateProvider(provider));
948 }
949 self.providers.insert(provider, schema);
950 Ok(())
951 }
952
953 pub fn normalize(
954 &self,
955 provider: &ProviderId,
956 kind: &str,
957 headers: &BTreeMap<String, String>,
958 raw: JsonValue,
959 ) -> Result<ProviderPayload, ProviderCatalogError> {
960 let schema = self
961 .providers
962 .get(provider.as_str())
963 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
964 schema.normalize(kind, headers, raw)
965 }
966
967 pub fn schema_names(&self) -> BTreeMap<String, String> {
968 self.providers
969 .iter()
970 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
971 .collect()
972 }
973
974 pub fn entries(&self) -> Vec<ProviderMetadata> {
975 self.providers
976 .values()
977 .map(|schema| schema.metadata())
978 .collect()
979 }
980
981 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
982 self.providers.get(provider).map(|schema| schema.metadata())
983 }
984}
985
986pub fn register_provider_schema(
987 schema: Arc<dyn ProviderSchema>,
988) -> Result<(), ProviderCatalogError> {
989 provider_catalog()
990 .write()
991 .expect("provider catalog poisoned")
992 .register(schema)
993}
994
995pub fn reset_provider_catalog() {
996 *provider_catalog()
997 .write()
998 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
999}
1000
1001pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1002 provider_catalog()
1003 .read()
1004 .expect("provider catalog poisoned")
1005 .schema_names()
1006}
1007
1008pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1009 provider_catalog()
1010 .read()
1011 .expect("provider catalog poisoned")
1012 .entries()
1013}
1014
1015pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1016 provider_catalog()
1017 .read()
1018 .expect("provider catalog poisoned")
1019 .metadata_for(provider)
1020}
1021
1022fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1023 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1024 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1025}
1026
1027struct BuiltinProviderSchema {
1028 provider_id: &'static str,
1029 harn_schema_name: &'static str,
1030 metadata: ProviderMetadata,
1031 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1032}
1033
1034impl ProviderSchema for BuiltinProviderSchema {
1035 fn provider_id(&self) -> &'static str {
1036 self.provider_id
1037 }
1038
1039 fn harn_schema_name(&self) -> &'static str {
1040 self.harn_schema_name
1041 }
1042
1043 fn metadata(&self) -> ProviderMetadata {
1044 self.metadata.clone()
1045 }
1046
1047 fn normalize(
1048 &self,
1049 kind: &str,
1050 headers: &BTreeMap<String, String>,
1051 raw: JsonValue,
1052 ) -> Result<ProviderPayload, ProviderCatalogError> {
1053 Ok((self.normalize)(kind, headers, raw))
1054 }
1055}
1056
1057fn provider_metadata_entry(
1058 provider: &str,
1059 kinds: &[&str],
1060 schema_name: &str,
1061 outbound_methods: &[&str],
1062 signature_verification: SignatureVerificationMetadata,
1063 secret_requirements: Vec<ProviderSecretRequirement>,
1064 runtime: ProviderRuntimeMetadata,
1065) -> ProviderMetadata {
1066 ProviderMetadata {
1067 provider: provider.to_string(),
1068 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1069 schema_name: schema_name.to_string(),
1070 outbound_methods: outbound_methods
1071 .iter()
1072 .map(|name| ProviderOutboundMethod {
1073 name: (*name).to_string(),
1074 })
1075 .collect(),
1076 secret_requirements,
1077 signature_verification,
1078 runtime,
1079 }
1080}
1081
1082fn hmac_signature_metadata(
1083 variant: &str,
1084 signature_header: &str,
1085 timestamp_header: Option<&str>,
1086 id_header: Option<&str>,
1087 default_tolerance_secs: Option<i64>,
1088 encoding: &str,
1089) -> SignatureVerificationMetadata {
1090 SignatureVerificationMetadata::Hmac {
1091 variant: variant.to_string(),
1092 raw_body: true,
1093 signature_header: signature_header.to_string(),
1094 timestamp_header: timestamp_header.map(ToString::to_string),
1095 id_header: id_header.map(ToString::to_string),
1096 default_tolerance_secs,
1097 digest: "sha256".to_string(),
1098 encoding: encoding.to_string(),
1099 }
1100}
1101
1102fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1103 ProviderSecretRequirement {
1104 name: name.to_string(),
1105 required: true,
1106 namespace: namespace.to_string(),
1107 }
1108}
1109
1110fn outbound_method(name: &str) -> ProviderOutboundMethod {
1111 ProviderOutboundMethod {
1112 name: name.to_string(),
1113 }
1114}
1115
1116fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1117 ProviderSecretRequirement {
1118 name: name.to_string(),
1119 required: false,
1120 namespace: namespace.to_string(),
1121 }
1122}
1123
1124fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1125 vec![
1126 Arc::new(BuiltinProviderSchema {
1127 provider_id: "github",
1128 harn_schema_name: "GitHubEventPayload",
1129 metadata: provider_metadata_entry(
1130 "github",
1131 &["webhook"],
1132 "GitHubEventPayload",
1133 &[],
1134 hmac_signature_metadata(
1135 "github",
1136 "X-Hub-Signature-256",
1137 None,
1138 Some("X-GitHub-Delivery"),
1139 None,
1140 "hex",
1141 ),
1142 vec![required_secret("signing_secret", "github")],
1143 ProviderRuntimeMetadata::Builtin {
1144 connector: "webhook".to_string(),
1145 default_signature_variant: Some("github".to_string()),
1146 },
1147 ),
1148 normalize: github_payload,
1149 }),
1150 Arc::new(BuiltinProviderSchema {
1151 provider_id: "slack",
1152 harn_schema_name: "SlackEventPayload",
1153 metadata: provider_metadata_entry(
1154 "slack",
1155 &["webhook"],
1156 "SlackEventPayload",
1157 &[
1158 "post_message",
1159 "update_message",
1160 "add_reaction",
1161 "open_view",
1162 "user_info",
1163 "api_call",
1164 "upload_file",
1165 ],
1166 hmac_signature_metadata(
1167 "slack",
1168 "X-Slack-Signature",
1169 Some("X-Slack-Request-Timestamp"),
1170 None,
1171 Some(300),
1172 "hex",
1173 ),
1174 vec![required_secret("signing_secret", "slack")],
1175 ProviderRuntimeMetadata::Builtin {
1176 connector: "slack".to_string(),
1177 default_signature_variant: Some("slack".to_string()),
1178 },
1179 ),
1180 normalize: slack_payload,
1181 }),
1182 Arc::new(BuiltinProviderSchema {
1183 provider_id: "linear",
1184 harn_schema_name: "LinearEventPayload",
1185 metadata: {
1186 let mut metadata = provider_metadata_entry(
1187 "linear",
1188 &["webhook"],
1189 "LinearEventPayload",
1190 &[],
1191 hmac_signature_metadata(
1192 "linear",
1193 "Linear-Signature",
1194 None,
1195 Some("Linear-Delivery"),
1196 Some(75),
1197 "hex",
1198 ),
1199 vec![
1200 required_secret("signing_secret", "linear"),
1201 optional_secret("access_token", "linear"),
1202 ],
1203 ProviderRuntimeMetadata::Builtin {
1204 connector: "linear".to_string(),
1205 default_signature_variant: Some("linear".to_string()),
1206 },
1207 );
1208 metadata.outbound_methods = vec![
1209 ProviderOutboundMethod {
1210 name: "list_issues".to_string(),
1211 },
1212 ProviderOutboundMethod {
1213 name: "update_issue".to_string(),
1214 },
1215 ProviderOutboundMethod {
1216 name: "create_comment".to_string(),
1217 },
1218 ProviderOutboundMethod {
1219 name: "search".to_string(),
1220 },
1221 ProviderOutboundMethod {
1222 name: "graphql".to_string(),
1223 },
1224 ];
1225 metadata
1226 },
1227 normalize: linear_payload,
1228 }),
1229 Arc::new(BuiltinProviderSchema {
1230 provider_id: "notion",
1231 harn_schema_name: "NotionEventPayload",
1232 metadata: {
1233 let mut metadata = provider_metadata_entry(
1234 "notion",
1235 &["webhook", "poll"],
1236 "NotionEventPayload",
1237 &[],
1238 hmac_signature_metadata(
1239 "notion",
1240 "X-Notion-Signature",
1241 None,
1242 None,
1243 None,
1244 "hex",
1245 ),
1246 vec![required_secret("verification_token", "notion")],
1247 ProviderRuntimeMetadata::Builtin {
1248 connector: "notion".to_string(),
1249 default_signature_variant: Some("notion".to_string()),
1250 },
1251 );
1252 metadata.outbound_methods = vec![
1253 outbound_method("get_page"),
1254 outbound_method("update_page"),
1255 outbound_method("append_blocks"),
1256 outbound_method("query_database"),
1257 outbound_method("search"),
1258 outbound_method("create_comment"),
1259 outbound_method("api_call"),
1260 ];
1261 metadata
1262 },
1263 normalize: notion_payload,
1264 }),
1265 Arc::new(BuiltinProviderSchema {
1266 provider_id: "cron",
1267 harn_schema_name: "CronEventPayload",
1268 metadata: provider_metadata_entry(
1269 "cron",
1270 &["cron"],
1271 "CronEventPayload",
1272 &[],
1273 SignatureVerificationMetadata::None,
1274 Vec::new(),
1275 ProviderRuntimeMetadata::Builtin {
1276 connector: "cron".to_string(),
1277 default_signature_variant: None,
1278 },
1279 ),
1280 normalize: cron_payload,
1281 }),
1282 Arc::new(BuiltinProviderSchema {
1283 provider_id: "webhook",
1284 harn_schema_name: "GenericWebhookPayload",
1285 metadata: provider_metadata_entry(
1286 "webhook",
1287 &["webhook"],
1288 "GenericWebhookPayload",
1289 &[],
1290 hmac_signature_metadata(
1291 "standard",
1292 "webhook-signature",
1293 Some("webhook-timestamp"),
1294 Some("webhook-id"),
1295 Some(300),
1296 "base64",
1297 ),
1298 vec![required_secret("signing_secret", "webhook")],
1299 ProviderRuntimeMetadata::Builtin {
1300 connector: "webhook".to_string(),
1301 default_signature_variant: Some("standard".to_string()),
1302 },
1303 ),
1304 normalize: webhook_payload,
1305 }),
1306 Arc::new(BuiltinProviderSchema {
1307 provider_id: "a2a-push",
1308 harn_schema_name: "A2aPushPayload",
1309 metadata: provider_metadata_entry(
1310 "a2a-push",
1311 &["a2a-push"],
1312 "A2aPushPayload",
1313 &[],
1314 SignatureVerificationMetadata::None,
1315 Vec::new(),
1316 ProviderRuntimeMetadata::Builtin {
1317 connector: "a2a-push".to_string(),
1318 default_signature_variant: None,
1319 },
1320 ),
1321 normalize: a2a_push_payload,
1322 }),
1323 Arc::new(stream_provider_schema("kafka", kafka_payload)),
1324 Arc::new(stream_provider_schema("nats", nats_payload)),
1325 Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1326 Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1327 Arc::new(stream_provider_schema("email", email_payload)),
1328 Arc::new(stream_provider_schema("websocket", websocket_payload)),
1329 ]
1330}
1331
1332fn stream_provider_schema(
1333 provider_id: &'static str,
1334 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1335) -> BuiltinProviderSchema {
1336 BuiltinProviderSchema {
1337 provider_id,
1338 harn_schema_name: "StreamEventPayload",
1339 metadata: provider_metadata_entry(
1340 provider_id,
1341 &["stream"],
1342 "StreamEventPayload",
1343 &[],
1344 SignatureVerificationMetadata::None,
1345 Vec::new(),
1346 ProviderRuntimeMetadata::Placeholder,
1347 ),
1348 normalize,
1349 }
1350}
1351
1352fn github_payload(
1353 kind: &str,
1354 headers: &BTreeMap<String, String>,
1355 raw: JsonValue,
1356) -> ProviderPayload {
1357 let common = GitHubEventCommon {
1358 event: kind.to_string(),
1359 action: raw
1360 .get("action")
1361 .and_then(JsonValue::as_str)
1362 .map(ToString::to_string),
1363 delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1364 installation_id: raw
1365 .get("installation")
1366 .and_then(|value| value.get("id"))
1367 .and_then(JsonValue::as_i64),
1368 raw: raw.clone(),
1369 };
1370 let payload = match kind {
1371 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1372 common,
1373 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1374 }),
1375 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1376 common,
1377 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1378 }),
1379 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1380 common,
1381 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1382 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1383 }),
1384 "pull_request_review" => {
1385 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1386 common,
1387 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1388 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1389 })
1390 }
1391 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1392 common,
1393 commits: raw
1394 .get("commits")
1395 .and_then(JsonValue::as_array)
1396 .cloned()
1397 .unwrap_or_default(),
1398 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1399 }),
1400 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1401 common,
1402 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1403 }),
1404 "deployment_status" => {
1405 GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1406 common,
1407 deployment_status: raw
1408 .get("deployment_status")
1409 .cloned()
1410 .unwrap_or(JsonValue::Null),
1411 deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1412 })
1413 }
1414 "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1415 common,
1416 check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1417 }),
1418 _ => GitHubEventPayload::Other(common),
1419 };
1420 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1421}
1422
1423fn slack_payload(
1424 kind: &str,
1425 _headers: &BTreeMap<String, String>,
1426 raw: JsonValue,
1427) -> ProviderPayload {
1428 let event = raw.get("event");
1429 let common = SlackEventCommon {
1430 event: kind.to_string(),
1431 event_id: raw
1432 .get("event_id")
1433 .and_then(JsonValue::as_str)
1434 .map(ToString::to_string),
1435 api_app_id: raw
1436 .get("api_app_id")
1437 .and_then(JsonValue::as_str)
1438 .map(ToString::to_string),
1439 team_id: raw
1440 .get("team_id")
1441 .and_then(JsonValue::as_str)
1442 .map(ToString::to_string),
1443 channel_id: slack_channel_id(event),
1444 user_id: slack_user_id(event),
1445 event_ts: event
1446 .and_then(|value| value.get("event_ts"))
1447 .and_then(JsonValue::as_str)
1448 .map(ToString::to_string),
1449 raw: raw.clone(),
1450 };
1451 let payload = match kind {
1452 kind if kind == "message" || kind.starts_with("message.") => {
1453 SlackEventPayload::Message(SlackMessageEventPayload {
1454 subtype: event
1455 .and_then(|value| value.get("subtype"))
1456 .and_then(JsonValue::as_str)
1457 .map(ToString::to_string),
1458 channel_type: event
1459 .and_then(|value| value.get("channel_type"))
1460 .and_then(JsonValue::as_str)
1461 .map(ToString::to_string),
1462 channel: event
1463 .and_then(|value| value.get("channel"))
1464 .and_then(JsonValue::as_str)
1465 .map(ToString::to_string),
1466 user: event
1467 .and_then(|value| value.get("user"))
1468 .and_then(JsonValue::as_str)
1469 .map(ToString::to_string),
1470 text: event
1471 .and_then(|value| value.get("text"))
1472 .and_then(JsonValue::as_str)
1473 .map(ToString::to_string),
1474 ts: event
1475 .and_then(|value| value.get("ts"))
1476 .and_then(JsonValue::as_str)
1477 .map(ToString::to_string),
1478 thread_ts: event
1479 .and_then(|value| value.get("thread_ts"))
1480 .and_then(JsonValue::as_str)
1481 .map(ToString::to_string),
1482 common,
1483 })
1484 }
1485 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1486 channel: event
1487 .and_then(|value| value.get("channel"))
1488 .and_then(JsonValue::as_str)
1489 .map(ToString::to_string),
1490 user: event
1491 .and_then(|value| value.get("user"))
1492 .and_then(JsonValue::as_str)
1493 .map(ToString::to_string),
1494 text: event
1495 .and_then(|value| value.get("text"))
1496 .and_then(JsonValue::as_str)
1497 .map(ToString::to_string),
1498 ts: event
1499 .and_then(|value| value.get("ts"))
1500 .and_then(JsonValue::as_str)
1501 .map(ToString::to_string),
1502 thread_ts: event
1503 .and_then(|value| value.get("thread_ts"))
1504 .and_then(JsonValue::as_str)
1505 .map(ToString::to_string),
1506 common,
1507 }),
1508 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1509 reaction: event
1510 .and_then(|value| value.get("reaction"))
1511 .and_then(JsonValue::as_str)
1512 .map(ToString::to_string),
1513 item_user: event
1514 .and_then(|value| value.get("item_user"))
1515 .and_then(JsonValue::as_str)
1516 .map(ToString::to_string),
1517 item: event
1518 .and_then(|value| value.get("item"))
1519 .cloned()
1520 .unwrap_or(JsonValue::Null),
1521 common,
1522 }),
1523 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1524 user: event
1525 .and_then(|value| value.get("user"))
1526 .and_then(JsonValue::as_str)
1527 .map(ToString::to_string),
1528 channel: event
1529 .and_then(|value| value.get("channel"))
1530 .and_then(JsonValue::as_str)
1531 .map(ToString::to_string),
1532 tab: event
1533 .and_then(|value| value.get("tab"))
1534 .and_then(JsonValue::as_str)
1535 .map(ToString::to_string),
1536 view: event
1537 .and_then(|value| value.get("view"))
1538 .cloned()
1539 .unwrap_or(JsonValue::Null),
1540 common,
1541 }),
1542 "assistant_thread_started" => {
1543 let assistant_thread = event
1544 .and_then(|value| value.get("assistant_thread"))
1545 .cloned()
1546 .unwrap_or(JsonValue::Null);
1547 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1548 thread_ts: assistant_thread
1549 .get("thread_ts")
1550 .and_then(JsonValue::as_str)
1551 .map(ToString::to_string),
1552 context: assistant_thread
1553 .get("context")
1554 .cloned()
1555 .unwrap_or(JsonValue::Null),
1556 assistant_thread,
1557 common,
1558 })
1559 }
1560 _ => SlackEventPayload::Other(common),
1561 };
1562 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1563}
1564
1565fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1566 event
1567 .and_then(|value| value.get("channel"))
1568 .and_then(JsonValue::as_str)
1569 .map(ToString::to_string)
1570 .or_else(|| {
1571 event
1572 .and_then(|value| value.get("item"))
1573 .and_then(|value| value.get("channel"))
1574 .and_then(JsonValue::as_str)
1575 .map(ToString::to_string)
1576 })
1577 .or_else(|| {
1578 event
1579 .and_then(|value| value.get("channel"))
1580 .and_then(|value| value.get("id"))
1581 .and_then(JsonValue::as_str)
1582 .map(ToString::to_string)
1583 })
1584 .or_else(|| {
1585 event
1586 .and_then(|value| value.get("assistant_thread"))
1587 .and_then(|value| value.get("channel_id"))
1588 .and_then(JsonValue::as_str)
1589 .map(ToString::to_string)
1590 })
1591}
1592
1593fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1594 event
1595 .and_then(|value| value.get("user"))
1596 .and_then(JsonValue::as_str)
1597 .map(ToString::to_string)
1598 .or_else(|| {
1599 event
1600 .and_then(|value| value.get("user"))
1601 .and_then(|value| value.get("id"))
1602 .and_then(JsonValue::as_str)
1603 .map(ToString::to_string)
1604 })
1605 .or_else(|| {
1606 event
1607 .and_then(|value| value.get("item_user"))
1608 .and_then(JsonValue::as_str)
1609 .map(ToString::to_string)
1610 })
1611 .or_else(|| {
1612 event
1613 .and_then(|value| value.get("assistant_thread"))
1614 .and_then(|value| value.get("user_id"))
1615 .and_then(JsonValue::as_str)
1616 .map(ToString::to_string)
1617 })
1618}
1619
1620fn linear_payload(
1621 _kind: &str,
1622 headers: &BTreeMap<String, String>,
1623 raw: JsonValue,
1624) -> ProviderPayload {
1625 let common = linear_event_common(headers, &raw);
1626 let event = common.event.clone();
1627 let payload = match event.as_str() {
1628 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1629 common,
1630 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1631 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1632 }),
1633 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1634 common,
1635 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1636 }),
1637 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1638 common,
1639 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1640 }),
1641 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1642 common,
1643 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1644 }),
1645 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1646 common,
1647 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1648 }),
1649 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1650 common,
1651 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1652 }),
1653 "customer_request" => {
1654 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1655 common,
1656 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1657 })
1658 }
1659 _ => LinearEventPayload::Other(common),
1660 };
1661 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1662}
1663
1664fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1665 LinearEventCommon {
1666 event: linear_event_name(
1667 raw.get("type")
1668 .and_then(JsonValue::as_str)
1669 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1670 ),
1671 action: raw
1672 .get("action")
1673 .and_then(JsonValue::as_str)
1674 .map(ToString::to_string),
1675 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1676 organization_id: raw
1677 .get("organizationId")
1678 .and_then(JsonValue::as_str)
1679 .map(ToString::to_string),
1680 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1681 webhook_id: raw
1682 .get("webhookId")
1683 .and_then(JsonValue::as_str)
1684 .map(ToString::to_string),
1685 url: raw
1686 .get("url")
1687 .and_then(JsonValue::as_str)
1688 .map(ToString::to_string),
1689 created_at: raw
1690 .get("createdAt")
1691 .and_then(JsonValue::as_str)
1692 .map(ToString::to_string),
1693 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1694 raw: raw.clone(),
1695 }
1696}
1697
1698fn linear_event_name(raw_type: Option<&str>) -> String {
1699 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1700 "issue" => "issue".to_string(),
1701 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1702 "issuelabel" | "issue_label" => "issue_label".to_string(),
1703 "project" | "projectupdate" | "project_update" => "project".to_string(),
1704 "cycle" => "cycle".to_string(),
1705 "customer" => "customer".to_string(),
1706 "customerrequest" | "customer_request" => "customer_request".to_string(),
1707 other if !other.is_empty() => other.to_string(),
1708 _ => "other".to_string(),
1709 }
1710}
1711
1712fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1713 let Some(JsonValue::Object(fields)) = updated_from else {
1714 return Vec::new();
1715 };
1716 let mut changes = Vec::new();
1717 for (field, previous) in fields {
1718 let change = match field.as_str() {
1719 "title" => LinearIssueChange::Title {
1720 previous: previous.as_str().map(ToString::to_string),
1721 },
1722 "description" => LinearIssueChange::Description {
1723 previous: previous.as_str().map(ToString::to_string),
1724 },
1725 "priority" => LinearIssueChange::Priority {
1726 previous: parse_json_i64ish(previous),
1727 },
1728 "estimate" => LinearIssueChange::Estimate {
1729 previous: parse_json_i64ish(previous),
1730 },
1731 "stateId" => LinearIssueChange::StateId {
1732 previous: previous.as_str().map(ToString::to_string),
1733 },
1734 "teamId" => LinearIssueChange::TeamId {
1735 previous: previous.as_str().map(ToString::to_string),
1736 },
1737 "assigneeId" => LinearIssueChange::AssigneeId {
1738 previous: previous.as_str().map(ToString::to_string),
1739 },
1740 "projectId" => LinearIssueChange::ProjectId {
1741 previous: previous.as_str().map(ToString::to_string),
1742 },
1743 "cycleId" => LinearIssueChange::CycleId {
1744 previous: previous.as_str().map(ToString::to_string),
1745 },
1746 "dueDate" => LinearIssueChange::DueDate {
1747 previous: previous.as_str().map(ToString::to_string),
1748 },
1749 "parentId" => LinearIssueChange::ParentId {
1750 previous: previous.as_str().map(ToString::to_string),
1751 },
1752 "sortOrder" => LinearIssueChange::SortOrder {
1753 previous: previous.as_f64(),
1754 },
1755 "labelIds" => LinearIssueChange::LabelIds {
1756 previous: parse_string_array(previous),
1757 },
1758 "completedAt" => LinearIssueChange::CompletedAt {
1759 previous: previous.as_str().map(ToString::to_string),
1760 },
1761 _ => LinearIssueChange::Other {
1762 field: field.clone(),
1763 previous: previous.clone(),
1764 },
1765 };
1766 changes.push(change);
1767 }
1768 changes
1769}
1770
1771fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1772 value
1773 .as_i64()
1774 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1775 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1776}
1777
1778fn parse_string_array(value: &JsonValue) -> Vec<String> {
1779 let Some(array) = value.as_array() else {
1780 return Vec::new();
1781 };
1782 array
1783 .iter()
1784 .filter_map(|entry| {
1785 entry.as_str().map(ToString::to_string).or_else(|| {
1786 entry
1787 .get("id")
1788 .and_then(JsonValue::as_str)
1789 .map(ToString::to_string)
1790 })
1791 })
1792 .collect()
1793}
1794
1795fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1796 headers
1797 .iter()
1798 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1799 .map(|(_, value)| value.as_str())
1800}
1801
1802fn notion_payload(
1803 kind: &str,
1804 headers: &BTreeMap<String, String>,
1805 raw: JsonValue,
1806) -> ProviderPayload {
1807 let workspace_id = raw
1808 .get("workspace_id")
1809 .and_then(JsonValue::as_str)
1810 .map(ToString::to_string);
1811 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1812 event: kind.to_string(),
1813 workspace_id,
1814 request_id: headers
1815 .get("request-id")
1816 .cloned()
1817 .or_else(|| headers.get("x-request-id").cloned()),
1818 subscription_id: raw
1819 .get("subscription_id")
1820 .and_then(JsonValue::as_str)
1821 .map(ToString::to_string),
1822 integration_id: raw
1823 .get("integration_id")
1824 .and_then(JsonValue::as_str)
1825 .map(ToString::to_string),
1826 attempt_number: raw
1827 .get("attempt_number")
1828 .and_then(JsonValue::as_u64)
1829 .and_then(|value| u32::try_from(value).ok()),
1830 entity_id: raw
1831 .get("entity")
1832 .and_then(|value| value.get("id"))
1833 .and_then(JsonValue::as_str)
1834 .map(ToString::to_string),
1835 entity_type: raw
1836 .get("entity")
1837 .and_then(|value| value.get("type"))
1838 .and_then(JsonValue::as_str)
1839 .map(ToString::to_string),
1840 api_version: raw
1841 .get("api_version")
1842 .and_then(JsonValue::as_str)
1843 .map(ToString::to_string),
1844 verification_token: raw
1845 .get("verification_token")
1846 .and_then(JsonValue::as_str)
1847 .map(ToString::to_string),
1848 polled: None,
1849 raw,
1850 })))
1851}
1852
1853fn cron_payload(
1854 _kind: &str,
1855 _headers: &BTreeMap<String, String>,
1856 raw: JsonValue,
1857) -> ProviderPayload {
1858 let cron_id = raw
1859 .get("cron_id")
1860 .and_then(JsonValue::as_str)
1861 .map(ToString::to_string);
1862 let schedule = raw
1863 .get("schedule")
1864 .and_then(JsonValue::as_str)
1865 .map(ToString::to_string);
1866 let tick_at = raw
1867 .get("tick_at")
1868 .and_then(JsonValue::as_str)
1869 .and_then(parse_rfc3339)
1870 .unwrap_or_else(OffsetDateTime::now_utc);
1871 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1872 cron_id,
1873 schedule,
1874 tick_at,
1875 raw,
1876 }))
1877}
1878
1879fn webhook_payload(
1880 _kind: &str,
1881 headers: &BTreeMap<String, String>,
1882 raw: JsonValue,
1883) -> ProviderPayload {
1884 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1885 source: headers.get("X-Webhook-Source").cloned(),
1886 content_type: headers.get("Content-Type").cloned(),
1887 raw,
1888 }))
1889}
1890
1891fn a2a_push_payload(
1892 _kind: &str,
1893 _headers: &BTreeMap<String, String>,
1894 raw: JsonValue,
1895) -> ProviderPayload {
1896 let task_id = raw
1897 .get("task_id")
1898 .and_then(JsonValue::as_str)
1899 .map(ToString::to_string);
1900 let sender = raw
1901 .get("sender")
1902 .and_then(JsonValue::as_str)
1903 .map(ToString::to_string);
1904 let task_state = raw
1905 .pointer("/status/state")
1906 .or_else(|| raw.pointer("/statusUpdate/status/state"))
1907 .and_then(JsonValue::as_str)
1908 .map(|state| match state {
1909 "cancelled" => "canceled".to_string(),
1910 other => other.to_string(),
1911 });
1912 let artifact = raw
1913 .pointer("/artifactUpdate/artifact")
1914 .or_else(|| raw.get("artifact"))
1915 .cloned();
1916 let kind = task_state
1917 .as_deref()
1918 .map(|state| format!("a2a.task.{state}"))
1919 .unwrap_or_else(|| "a2a.task.update".to_string());
1920 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1921 task_id,
1922 task_state,
1923 artifact,
1924 sender,
1925 raw,
1926 kind,
1927 }))
1928}
1929
1930fn kafka_payload(
1931 kind: &str,
1932 headers: &BTreeMap<String, String>,
1933 raw: JsonValue,
1934) -> ProviderPayload {
1935 ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1936 kind, headers, raw,
1937 )))
1938}
1939
1940fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1941 ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1942 kind, headers, raw,
1943 )))
1944}
1945
1946fn pulsar_payload(
1947 kind: &str,
1948 headers: &BTreeMap<String, String>,
1949 raw: JsonValue,
1950) -> ProviderPayload {
1951 ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1952 kind, headers, raw,
1953 )))
1954}
1955
1956fn postgres_cdc_payload(
1957 kind: &str,
1958 headers: &BTreeMap<String, String>,
1959 raw: JsonValue,
1960) -> ProviderPayload {
1961 ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1962 kind, headers, raw,
1963 )))
1964}
1965
1966fn email_payload(
1967 kind: &str,
1968 headers: &BTreeMap<String, String>,
1969 raw: JsonValue,
1970) -> ProviderPayload {
1971 ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1972 kind, headers, raw,
1973 )))
1974}
1975
1976fn websocket_payload(
1977 kind: &str,
1978 headers: &BTreeMap<String, String>,
1979 raw: JsonValue,
1980) -> ProviderPayload {
1981 ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
1982 kind, headers, raw,
1983 )))
1984}
1985
1986fn stream_payload(
1987 kind: &str,
1988 headers: &BTreeMap<String, String>,
1989 raw: JsonValue,
1990) -> StreamEventPayload {
1991 StreamEventPayload {
1992 event: kind.to_string(),
1993 source: json_stringish(&raw, &["source", "connector", "origin"]),
1994 stream: json_stringish(
1995 &raw,
1996 &["stream", "topic", "subject", "channel", "mailbox", "slot"],
1997 ),
1998 partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
1999 offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2000 key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2001 timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2002 headers: headers.clone(),
2003 raw,
2004 }
2005}
2006
2007fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2008 fields.iter().find_map(|field| {
2009 let value = raw.get(*field)?;
2010 value
2011 .as_str()
2012 .map(ToString::to_string)
2013 .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2014 .or_else(|| value.as_u64().map(|number| number.to_string()))
2015 })
2016}
2017
2018fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2019 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2020}
2021
2022#[cfg(test)]
2023mod tests {
2024 use super::*;
2025
2026 fn sample_headers() -> BTreeMap<String, String> {
2027 BTreeMap::from([
2028 ("Authorization".to_string(), "Bearer secret".to_string()),
2029 ("Cookie".to_string(), "session=abc".to_string()),
2030 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2031 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2032 ("X-GitHub-Event".to_string(), "issues".to_string()),
2033 ("X-Webhook-Token".to_string(), "token".to_string()),
2034 ])
2035 }
2036
2037 #[test]
2038 fn default_redaction_policy_keeps_safe_headers() {
2039 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2040 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2041 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2042 assert_eq!(
2043 redacted.get("Authorization").unwrap(),
2044 REDACTED_HEADER_VALUE
2045 );
2046 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2047 assert_eq!(
2048 redacted.get("X-Webhook-Token").unwrap(),
2049 REDACTED_HEADER_VALUE
2050 );
2051 }
2052
2053 #[test]
2054 fn provider_catalog_rejects_duplicates() {
2055 let mut catalog = ProviderCatalog::default();
2056 catalog
2057 .register(Arc::new(BuiltinProviderSchema {
2058 provider_id: "github",
2059 harn_schema_name: "GitHubEventPayload",
2060 metadata: provider_metadata_entry(
2061 "github",
2062 &["webhook"],
2063 "GitHubEventPayload",
2064 &[],
2065 SignatureVerificationMetadata::None,
2066 Vec::new(),
2067 ProviderRuntimeMetadata::Placeholder,
2068 ),
2069 normalize: github_payload,
2070 }))
2071 .unwrap();
2072 let error = catalog
2073 .register(Arc::new(BuiltinProviderSchema {
2074 provider_id: "github",
2075 harn_schema_name: "GitHubEventPayload",
2076 metadata: provider_metadata_entry(
2077 "github",
2078 &["webhook"],
2079 "GitHubEventPayload",
2080 &[],
2081 SignatureVerificationMetadata::None,
2082 Vec::new(),
2083 ProviderRuntimeMetadata::Placeholder,
2084 ),
2085 normalize: github_payload,
2086 }))
2087 .unwrap_err();
2088 assert_eq!(
2089 error,
2090 ProviderCatalogError::DuplicateProvider("github".to_string())
2091 );
2092 }
2093
2094 #[test]
2095 fn registered_provider_metadata_marks_builtin_connectors() {
2096 let entries = registered_provider_metadata();
2097 let builtin: Vec<&ProviderMetadata> = entries
2098 .iter()
2099 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2100 .collect();
2101
2102 assert_eq!(builtin.len(), 7);
2103 assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2104 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2105 assert!(builtin.iter().any(|entry| entry.provider == "github"));
2106 assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2107 assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2108 assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2109 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2110 let kafka = entries
2111 .iter()
2112 .find(|entry| entry.provider == "kafka")
2113 .expect("kafka stream provider");
2114 assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2115 assert_eq!(kafka.schema_name, "StreamEventPayload");
2116 assert!(matches!(
2117 kafka.runtime,
2118 ProviderRuntimeMetadata::Placeholder
2119 ));
2120 }
2121
2122 #[test]
2123 fn trigger_event_round_trip_is_stable() {
2124 let provider = ProviderId::from("github");
2125 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2126 let payload = ProviderPayload::normalize(
2127 &provider,
2128 "issues",
2129 &sample_headers(),
2130 serde_json::json!({
2131 "action": "opened",
2132 "installation": {"id": 42},
2133 "issue": {"number": 99}
2134 }),
2135 )
2136 .unwrap();
2137 let event = TriggerEvent {
2138 id: TriggerEventId("trigger_evt_fixed".to_string()),
2139 provider,
2140 kind: "issues".to_string(),
2141 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2142 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2143 dedupe_key: "delivery-123".to_string(),
2144 trace_id: TraceId("trace_fixed".to_string()),
2145 tenant_id: Some(TenantId("tenant_1".to_string())),
2146 headers,
2147 provider_payload: payload,
2148 signature_status: SignatureStatus::Verified,
2149 dedupe_claimed: false,
2150 batch: None,
2151 raw_body: Some(vec![0, 159, 255, 10]),
2152 };
2153
2154 let once = serde_json::to_value(&event).unwrap();
2155 assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2156 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2157 let twice = serde_json::to_value(&decoded).unwrap();
2158 assert_eq!(decoded, event);
2159 assert_eq!(once, twice);
2160 }
2161
2162 #[test]
2163 fn unknown_provider_errors() {
2164 let error = ProviderPayload::normalize(
2165 &ProviderId::from("custom-provider"),
2166 "thing.happened",
2167 &BTreeMap::new(),
2168 serde_json::json!({"ok": true}),
2169 )
2170 .unwrap_err();
2171 assert_eq!(
2172 error,
2173 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2174 );
2175 }
2176
2177 #[test]
2178 fn provider_normalizes_stream_payloads() {
2179 let payload = ProviderPayload::normalize(
2180 &ProviderId::from("kafka"),
2181 "quote.tick",
2182 &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2183 serde_json::json!({
2184 "topic": "quotes",
2185 "partition": 7,
2186 "offset": "42",
2187 "key": "AAPL",
2188 "timestamp": "2026-04-21T12:00:00Z"
2189 }),
2190 )
2191 .expect("stream payload");
2192 let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2193 panic!("expected kafka stream payload")
2194 };
2195 assert_eq!(payload.event, "quote.tick");
2196 assert_eq!(payload.stream.as_deref(), Some("quotes"));
2197 assert_eq!(payload.partition.as_deref(), Some("7"));
2198 assert_eq!(payload.offset.as_deref(), Some("42"));
2199 assert_eq!(payload.key.as_deref(), Some("AAPL"));
2200 assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2201 }
2202}