1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14 value: &Option<Vec<u8>>,
15 serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18 S: Serializer,
19{
20 use base64::Engine;
21
22 match value {
23 Some(bytes) => {
24 serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25 }
26 None => serializer.serialize_none(),
27 }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32 D: Deserializer<'de>,
33{
34 use base64::Engine;
35
36 let encoded = Option::<String>::deserialize(deserializer)?;
37 encoded
38 .map(|text| {
39 base64::engine::general_purpose::STANDARD
40 .decode(text.as_bytes())
41 .map_err(serde::de::Error::custom)
42 })
43 .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51 pub fn new() -> Self {
52 Self(format!("trigger_evt_{}", Uuid::now_v7()))
53 }
54}
55
56impl Default for TriggerEventId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67 pub fn new(value: impl Into<String>) -> Self {
68 Self(value.into())
69 }
70
71 pub fn as_str(&self) -> &str {
72 self.0.as_str()
73 }
74}
75
76impl From<&str> for ProviderId {
77 fn from(value: &str) -> Self {
78 Self::new(value)
79 }
80}
81
82impl From<String> for ProviderId {
83 fn from(value: String) -> Self {
84 Self::new(value)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93 pub fn new() -> Self {
94 Self(format!("trace_{}", Uuid::now_v7()))
95 }
96}
97
98impl Default for TraceId {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117 Verified,
118 Unsigned,
119 Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124 pub event: String,
125 pub action: Option<String>,
126 pub delivery_id: Option<String>,
127 pub installation_id: Option<i64>,
128 pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133 #[serde(flatten)]
134 pub common: GitHubEventCommon,
135 pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140 #[serde(flatten)]
141 pub common: GitHubEventCommon,
142 pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147 #[serde(flatten)]
148 pub common: GitHubEventCommon,
149 pub issue: JsonValue,
150 pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155 #[serde(flatten)]
156 pub common: GitHubEventCommon,
157 pub pull_request: JsonValue,
158 pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163 #[serde(flatten)]
164 pub common: GitHubEventCommon,
165 #[serde(default)]
166 pub commits: Vec<JsonValue>,
167 pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172 #[serde(flatten)]
173 pub common: GitHubEventCommon,
174 pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179 #[serde(flatten)]
180 pub common: GitHubEventCommon,
181 pub deployment_status: JsonValue,
182 pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187 #[serde(flatten)]
188 pub common: GitHubEventCommon,
189 pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195 Issues(GitHubIssuesEventPayload),
196 PullRequest(GitHubPullRequestEventPayload),
197 IssueComment(GitHubIssueCommentEventPayload),
198 PullRequestReview(GitHubPullRequestReviewEventPayload),
199 Push(GitHubPushEventPayload),
200 WorkflowRun(GitHubWorkflowRunEventPayload),
201 DeploymentStatus(GitHubDeploymentStatusEventPayload),
202 CheckRun(GitHubCheckRunEventPayload),
203 Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208 pub event: String,
209 pub event_id: Option<String>,
210 pub api_app_id: Option<String>,
211 pub team_id: Option<String>,
212 pub channel_id: Option<String>,
213 pub user_id: Option<String>,
214 pub event_ts: Option<String>,
215 pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220 #[serde(flatten)]
221 pub common: SlackEventCommon,
222 pub subtype: Option<String>,
223 pub channel_type: Option<String>,
224 pub channel: Option<String>,
225 pub user: Option<String>,
226 pub text: Option<String>,
227 pub ts: Option<String>,
228 pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233 #[serde(flatten)]
234 pub common: SlackEventCommon,
235 pub channel: Option<String>,
236 pub user: Option<String>,
237 pub text: Option<String>,
238 pub ts: Option<String>,
239 pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244 #[serde(flatten)]
245 pub common: SlackEventCommon,
246 pub reaction: Option<String>,
247 pub item_user: Option<String>,
248 pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253 #[serde(flatten)]
254 pub common: SlackEventCommon,
255 pub user: Option<String>,
256 pub channel: Option<String>,
257 pub tab: Option<String>,
258 pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263 #[serde(flatten)]
264 pub common: SlackEventCommon,
265 pub assistant_thread: JsonValue,
266 pub thread_ts: Option<String>,
267 pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273 Message(SlackMessageEventPayload),
274 AppMention(SlackAppMentionEventPayload),
275 ReactionAdded(SlackReactionAddedEventPayload),
276 AppHomeOpened(SlackAppHomeOpenedEventPayload),
277 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278 Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283 pub event: String,
284 pub action: Option<String>,
285 pub delivery_id: Option<String>,
286 pub organization_id: Option<String>,
287 pub webhook_timestamp: Option<i64>,
288 pub webhook_id: Option<String>,
289 pub url: Option<String>,
290 pub created_at: Option<String>,
291 pub actor: JsonValue,
292 pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297 Title { previous: Option<String> },
298 Description { previous: Option<String> },
299 Priority { previous: Option<i64> },
300 Estimate { previous: Option<i64> },
301 StateId { previous: Option<String> },
302 TeamId { previous: Option<String> },
303 AssigneeId { previous: Option<String> },
304 ProjectId { previous: Option<String> },
305 CycleId { previous: Option<String> },
306 DueDate { previous: Option<String> },
307 ParentId { previous: Option<String> },
308 SortOrder { previous: Option<f64> },
309 LabelIds { previous: Vec<String> },
310 CompletedAt { previous: Option<String> },
311 Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316 where
317 S: Serializer,
318 {
319 let value = match self {
320 Self::Title { previous } => {
321 serde_json::json!({ "field_name": "title", "previous": previous })
322 }
323 Self::Description { previous } => {
324 serde_json::json!({ "field_name": "description", "previous": previous })
325 }
326 Self::Priority { previous } => {
327 serde_json::json!({ "field_name": "priority", "previous": previous })
328 }
329 Self::Estimate { previous } => {
330 serde_json::json!({ "field_name": "estimate", "previous": previous })
331 }
332 Self::StateId { previous } => {
333 serde_json::json!({ "field_name": "state_id", "previous": previous })
334 }
335 Self::TeamId { previous } => {
336 serde_json::json!({ "field_name": "team_id", "previous": previous })
337 }
338 Self::AssigneeId { previous } => {
339 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340 }
341 Self::ProjectId { previous } => {
342 serde_json::json!({ "field_name": "project_id", "previous": previous })
343 }
344 Self::CycleId { previous } => {
345 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346 }
347 Self::DueDate { previous } => {
348 serde_json::json!({ "field_name": "due_date", "previous": previous })
349 }
350 Self::ParentId { previous } => {
351 serde_json::json!({ "field_name": "parent_id", "previous": previous })
352 }
353 Self::SortOrder { previous } => {
354 serde_json::json!({ "field_name": "sort_order", "previous": previous })
355 }
356 Self::LabelIds { previous } => {
357 serde_json::json!({ "field_name": "label_ids", "previous": previous })
358 }
359 Self::CompletedAt { previous } => {
360 serde_json::json!({ "field_name": "completed_at", "previous": previous })
361 }
362 Self::Other { field, previous } => {
363 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364 }
365 };
366 value.serialize(serializer)
367 }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372 where
373 D: Deserializer<'de>,
374 {
375 let value = JsonValue::deserialize(deserializer)?;
376 let field_name = value
377 .get("field_name")
378 .and_then(JsonValue::as_str)
379 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381 Ok(match field_name {
382 "title" => Self::Title {
383 previous: previous.as_str().map(ToString::to_string),
384 },
385 "description" => Self::Description {
386 previous: previous.as_str().map(ToString::to_string),
387 },
388 "priority" => Self::Priority {
389 previous: parse_json_i64ish(&previous),
390 },
391 "estimate" => Self::Estimate {
392 previous: parse_json_i64ish(&previous),
393 },
394 "state_id" => Self::StateId {
395 previous: previous.as_str().map(ToString::to_string),
396 },
397 "team_id" => Self::TeamId {
398 previous: previous.as_str().map(ToString::to_string),
399 },
400 "assignee_id" => Self::AssigneeId {
401 previous: previous.as_str().map(ToString::to_string),
402 },
403 "project_id" => Self::ProjectId {
404 previous: previous.as_str().map(ToString::to_string),
405 },
406 "cycle_id" => Self::CycleId {
407 previous: previous.as_str().map(ToString::to_string),
408 },
409 "due_date" => Self::DueDate {
410 previous: previous.as_str().map(ToString::to_string),
411 },
412 "parent_id" => Self::ParentId {
413 previous: previous.as_str().map(ToString::to_string),
414 },
415 "sort_order" => Self::SortOrder {
416 previous: previous.as_f64(),
417 },
418 "label_ids" => Self::LabelIds {
419 previous: parse_string_array(&previous),
420 },
421 "completed_at" => Self::CompletedAt {
422 previous: previous.as_str().map(ToString::to_string),
423 },
424 "other" => Self::Other {
425 field: value
426 .get("field")
427 .and_then(JsonValue::as_str)
428 .map(ToString::to_string)
429 .unwrap_or_else(|| "unknown".to_string()),
430 previous,
431 },
432 other => Self::Other {
433 field: other.to_string(),
434 previous,
435 },
436 })
437 }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442 #[serde(flatten)]
443 pub common: LinearEventCommon,
444 pub issue: JsonValue,
445 #[serde(default)]
446 pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451 #[serde(flatten)]
452 pub common: LinearEventCommon,
453 pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458 #[serde(flatten)]
459 pub common: LinearEventCommon,
460 pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465 #[serde(flatten)]
466 pub common: LinearEventCommon,
467 pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472 #[serde(flatten)]
473 pub common: LinearEventCommon,
474 pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479 #[serde(flatten)]
480 pub common: LinearEventCommon,
481 pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486 #[serde(flatten)]
487 pub common: LinearEventCommon,
488 pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494 Issue(LinearIssueEventPayload),
495 IssueComment(LinearIssueCommentEventPayload),
496 IssueLabel(LinearIssueLabelEventPayload),
497 Project(LinearProjectEventPayload),
498 Cycle(LinearCycleEventPayload),
499 Customer(LinearCustomerEventPayload),
500 CustomerRequest(LinearCustomerRequestEventPayload),
501 Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506 pub resource: String,
507 pub source_id: String,
508 pub entity_id: String,
509 pub high_water_mark: String,
510 pub before: Option<JsonValue>,
511 pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516 pub event: String,
517 pub workspace_id: Option<String>,
518 pub request_id: Option<String>,
519 pub subscription_id: Option<String>,
520 pub integration_id: Option<String>,
521 pub attempt_number: Option<u32>,
522 pub entity_id: Option<String>,
523 pub entity_type: Option<String>,
524 pub api_version: Option<String>,
525 pub verification_token: Option<String>,
526 pub polled: Option<NotionPolledChangeEvent>,
527 pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532 pub cron_id: Option<String>,
533 pub schedule: Option<String>,
534 #[serde(with = "time::serde::rfc3339")]
535 pub tick_at: OffsetDateTime,
536 pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541 pub source: Option<String>,
542 pub content_type: Option<String>,
543 pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548 pub task_id: Option<String>,
549 pub task_state: Option<String>,
550 pub artifact: Option<JsonValue>,
551 pub sender: Option<String>,
552 pub raw: JsonValue,
553 pub kind: String,
554}
555
556#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
557pub struct StreamEventPayload {
558 pub event: String,
559 pub source: Option<String>,
560 pub stream: Option<String>,
561 pub partition: Option<String>,
562 pub offset: Option<String>,
563 pub key: Option<String>,
564 pub timestamp: Option<String>,
565 #[serde(default)]
566 pub headers: BTreeMap<String, String>,
567 pub raw: JsonValue,
568}
569
570#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
571pub struct ExtensionProviderPayload {
572 pub provider: String,
573 pub schema_name: String,
574 pub raw: JsonValue,
575}
576
577#[allow(clippy::large_enum_variant)]
578#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
579#[serde(untagged)]
580pub enum ProviderPayload {
581 Known(KnownProviderPayload),
582 Extension(ExtensionProviderPayload),
583}
584
585impl ProviderPayload {
586 pub fn provider(&self) -> &str {
587 match self {
588 Self::Known(known) => known.provider(),
589 Self::Extension(payload) => payload.provider.as_str(),
590 }
591 }
592
593 pub fn normalize(
594 provider: &ProviderId,
595 kind: &str,
596 headers: &BTreeMap<String, String>,
597 raw: JsonValue,
598 ) -> Result<Self, ProviderCatalogError> {
599 provider_catalog()
600 .read()
601 .expect("provider catalog poisoned")
602 .normalize(provider, kind, headers, raw)
603 }
604}
605
606#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
607#[serde(tag = "provider")]
608pub enum KnownProviderPayload {
609 #[serde(rename = "github")]
610 GitHub(GitHubEventPayload),
611 #[serde(rename = "slack")]
612 Slack(Box<SlackEventPayload>),
613 #[serde(rename = "linear")]
614 Linear(LinearEventPayload),
615 #[serde(rename = "notion")]
616 Notion(Box<NotionEventPayload>),
617 #[serde(rename = "cron")]
618 Cron(CronEventPayload),
619 #[serde(rename = "webhook")]
620 Webhook(GenericWebhookPayload),
621 #[serde(rename = "a2a-push")]
622 A2aPush(A2aPushPayload),
623 #[serde(rename = "kafka")]
624 Kafka(StreamEventPayload),
625 #[serde(rename = "nats")]
626 Nats(StreamEventPayload),
627 #[serde(rename = "pulsar")]
628 Pulsar(StreamEventPayload),
629 #[serde(rename = "postgres-cdc")]
630 PostgresCdc(StreamEventPayload),
631 #[serde(rename = "email")]
632 Email(StreamEventPayload),
633 #[serde(rename = "websocket")]
634 Websocket(StreamEventPayload),
635}
636
637impl KnownProviderPayload {
638 pub fn provider(&self) -> &str {
639 match self {
640 Self::GitHub(_) => "github",
641 Self::Slack(_) => "slack",
642 Self::Linear(_) => "linear",
643 Self::Notion(_) => "notion",
644 Self::Cron(_) => "cron",
645 Self::Webhook(_) => "webhook",
646 Self::A2aPush(_) => "a2a-push",
647 Self::Kafka(_) => "kafka",
648 Self::Nats(_) => "nats",
649 Self::Pulsar(_) => "pulsar",
650 Self::PostgresCdc(_) => "postgres-cdc",
651 Self::Email(_) => "email",
652 Self::Websocket(_) => "websocket",
653 }
654 }
655}
656
657#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
658pub struct TriggerEvent {
659 pub id: TriggerEventId,
660 pub provider: ProviderId,
661 pub kind: String,
662 #[serde(with = "time::serde::rfc3339")]
663 pub received_at: OffsetDateTime,
664 #[serde(with = "time::serde::rfc3339::option")]
665 pub occurred_at: Option<OffsetDateTime>,
666 pub dedupe_key: String,
667 pub trace_id: TraceId,
668 pub tenant_id: Option<TenantId>,
669 pub headers: BTreeMap<String, String>,
670 #[serde(default, skip_serializing_if = "Option::is_none")]
671 pub batch: Option<Vec<JsonValue>>,
672 #[serde(
673 default,
674 skip_serializing_if = "Option::is_none",
675 serialize_with = "serialize_optional_bytes_b64",
676 deserialize_with = "deserialize_optional_bytes_b64"
677 )]
678 pub raw_body: Option<Vec<u8>>,
679 pub provider_payload: ProviderPayload,
680 pub signature_status: SignatureStatus,
681 #[serde(skip)]
682 pub dedupe_claimed: bool,
683}
684
685impl TriggerEvent {
686 #[allow(clippy::too_many_arguments)]
687 pub fn new(
688 provider: ProviderId,
689 kind: impl Into<String>,
690 occurred_at: Option<OffsetDateTime>,
691 dedupe_key: impl Into<String>,
692 tenant_id: Option<TenantId>,
693 headers: BTreeMap<String, String>,
694 provider_payload: ProviderPayload,
695 signature_status: SignatureStatus,
696 ) -> Self {
697 Self {
698 id: TriggerEventId::new(),
699 provider,
700 kind: kind.into(),
701 received_at: clock::now_utc(),
702 occurred_at,
703 dedupe_key: dedupe_key.into(),
704 trace_id: TraceId::new(),
705 tenant_id,
706 headers,
707 batch: None,
708 raw_body: None,
709 provider_payload,
710 signature_status,
711 dedupe_claimed: false,
712 }
713 }
714
715 pub fn dedupe_claimed(&self) -> bool {
716 self.dedupe_claimed
717 }
718
719 pub fn mark_dedupe_claimed(&mut self) {
720 self.dedupe_claimed = true;
721 }
722}
723
724#[derive(Clone, Debug, PartialEq, Eq)]
725pub struct HeaderRedactionPolicy {
726 safe_exact_names: BTreeSet<String>,
727}
728
729impl HeaderRedactionPolicy {
730 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
731 self.safe_exact_names
732 .insert(name.into().to_ascii_lowercase());
733 self
734 }
735
736 fn should_keep(&self, name: &str) -> bool {
737 let lower = name.to_ascii_lowercase();
738 if self.safe_exact_names.contains(lower.as_str()) {
739 return true;
740 }
741 matches!(
742 lower.as_str(),
743 "user-agent"
744 | "request-id"
745 | "x-request-id"
746 | "x-correlation-id"
747 | "content-type"
748 | "content-length"
749 | "x-github-event"
750 | "x-github-delivery"
751 | "x-github-hook-id"
752 | "x-hub-signature-256"
753 | "x-slack-request-timestamp"
754 | "x-slack-signature"
755 | "x-linear-signature"
756 | "x-notion-signature"
757 | "x-a2a-signature"
758 | "x-a2a-delivery"
759 ) || lower.ends_with("-event")
760 || lower.ends_with("-delivery")
761 || lower.contains("timestamp")
762 || lower.contains("request-id")
763 }
764
765 fn should_redact(&self, name: &str) -> bool {
766 let lower = name.to_ascii_lowercase();
767 if self.should_keep(lower.as_str()) {
768 return false;
769 }
770 lower.contains("authorization")
771 || lower.contains("cookie")
772 || lower.contains("secret")
773 || lower.contains("token")
774 || lower.contains("key")
775 }
776}
777
778impl Default for HeaderRedactionPolicy {
779 fn default() -> Self {
780 Self {
781 safe_exact_names: BTreeSet::from([
782 "content-length".to_string(),
783 "content-type".to_string(),
784 "request-id".to_string(),
785 "user-agent".to_string(),
786 "x-a2a-delivery".to_string(),
787 "x-a2a-signature".to_string(),
788 "x-correlation-id".to_string(),
789 "x-github-delivery".to_string(),
790 "x-github-event".to_string(),
791 "x-github-hook-id".to_string(),
792 "x-hub-signature-256".to_string(),
793 "x-linear-signature".to_string(),
794 "x-notion-signature".to_string(),
795 "x-request-id".to_string(),
796 "x-slack-request-timestamp".to_string(),
797 "x-slack-signature".to_string(),
798 ]),
799 }
800 }
801}
802
803pub fn redact_headers(
804 headers: &BTreeMap<String, String>,
805 policy: &HeaderRedactionPolicy,
806) -> BTreeMap<String, String> {
807 headers
808 .iter()
809 .map(|(name, value)| {
810 if policy.should_redact(name) {
811 (name.clone(), REDACTED_HEADER_VALUE.to_string())
812 } else {
813 (name.clone(), value.clone())
814 }
815 })
816 .collect()
817}
818
819#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
820pub struct ProviderSecretRequirement {
821 pub name: String,
822 pub required: bool,
823 pub namespace: String,
824}
825
826#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
827pub struct ProviderOutboundMethod {
828 pub name: String,
829}
830
831#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
832#[serde(tag = "kind", rename_all = "snake_case")]
833pub enum SignatureVerificationMetadata {
834 #[default]
835 None,
836 Hmac {
837 variant: String,
838 raw_body: bool,
839 signature_header: String,
840 timestamp_header: Option<String>,
841 id_header: Option<String>,
842 default_tolerance_secs: Option<i64>,
843 digest: String,
844 encoding: String,
845 },
846}
847
848#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
849#[serde(tag = "kind", rename_all = "snake_case")]
850pub enum ProviderRuntimeMetadata {
851 Builtin {
852 connector: String,
853 default_signature_variant: Option<String>,
854 },
855 #[default]
856 Placeholder,
857}
858
859#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
860pub struct ProviderMetadata {
861 pub provider: String,
862 #[serde(default)]
863 pub kinds: Vec<String>,
864 pub schema_name: String,
865 #[serde(default)]
866 pub outbound_methods: Vec<ProviderOutboundMethod>,
867 #[serde(default)]
868 pub secret_requirements: Vec<ProviderSecretRequirement>,
869 #[serde(default)]
870 pub signature_verification: SignatureVerificationMetadata,
871 #[serde(default)]
872 pub runtime: ProviderRuntimeMetadata,
873}
874
875impl ProviderMetadata {
876 pub fn supports_kind(&self, kind: &str) -> bool {
877 self.kinds.iter().any(|candidate| candidate == kind)
878 }
879
880 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
881 self.secret_requirements
882 .iter()
883 .filter(|requirement| requirement.required)
884 .map(|requirement| requirement.name.as_str())
885 }
886}
887
888pub trait ProviderSchema: Send + Sync {
889 fn provider_id(&self) -> &'static str;
890 fn harn_schema_name(&self) -> &'static str;
891 fn metadata(&self) -> ProviderMetadata {
892 ProviderMetadata {
893 provider: self.provider_id().to_string(),
894 schema_name: self.harn_schema_name().to_string(),
895 ..ProviderMetadata::default()
896 }
897 }
898 fn normalize(
899 &self,
900 kind: &str,
901 headers: &BTreeMap<String, String>,
902 raw: JsonValue,
903 ) -> Result<ProviderPayload, ProviderCatalogError>;
904}
905
906#[derive(Clone, Debug, PartialEq, Eq)]
907pub enum ProviderCatalogError {
908 DuplicateProvider(String),
909 UnknownProvider(String),
910}
911
912impl std::fmt::Display for ProviderCatalogError {
913 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914 match self {
915 Self::DuplicateProvider(provider) => {
916 write!(f, "provider `{provider}` is already registered")
917 }
918 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
919 }
920 }
921}
922
923impl std::error::Error for ProviderCatalogError {}
924
925#[derive(Clone, Default)]
926pub struct ProviderCatalog {
927 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
928}
929
930impl ProviderCatalog {
931 pub fn with_defaults() -> Self {
932 let mut catalog = Self::default();
933 for schema in default_provider_schemas() {
934 catalog
935 .register(schema)
936 .expect("default providers must register cleanly");
937 }
938 catalog
939 }
940
941 pub fn register(
942 &mut self,
943 schema: Arc<dyn ProviderSchema>,
944 ) -> Result<(), ProviderCatalogError> {
945 let provider = schema.provider_id().to_string();
946 if self.providers.contains_key(provider.as_str()) {
947 return Err(ProviderCatalogError::DuplicateProvider(provider));
948 }
949 self.providers.insert(provider, schema);
950 Ok(())
951 }
952
953 pub fn normalize(
954 &self,
955 provider: &ProviderId,
956 kind: &str,
957 headers: &BTreeMap<String, String>,
958 raw: JsonValue,
959 ) -> Result<ProviderPayload, ProviderCatalogError> {
960 let schema = self
961 .providers
962 .get(provider.as_str())
963 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
964 schema.normalize(kind, headers, raw)
965 }
966
967 pub fn schema_names(&self) -> BTreeMap<String, String> {
968 self.providers
969 .iter()
970 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
971 .collect()
972 }
973
974 pub fn entries(&self) -> Vec<ProviderMetadata> {
975 self.providers
976 .values()
977 .map(|schema| schema.metadata())
978 .collect()
979 }
980
981 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
982 self.providers.get(provider).map(|schema| schema.metadata())
983 }
984}
985
986pub fn register_provider_schema(
987 schema: Arc<dyn ProviderSchema>,
988) -> Result<(), ProviderCatalogError> {
989 provider_catalog()
990 .write()
991 .expect("provider catalog poisoned")
992 .register(schema)
993}
994
995pub fn reset_provider_catalog() {
996 *provider_catalog()
997 .write()
998 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
999}
1000
1001pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1002 provider_catalog()
1003 .read()
1004 .expect("provider catalog poisoned")
1005 .schema_names()
1006}
1007
1008pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1009 provider_catalog()
1010 .read()
1011 .expect("provider catalog poisoned")
1012 .entries()
1013}
1014
1015pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1016 provider_catalog()
1017 .read()
1018 .expect("provider catalog poisoned")
1019 .metadata_for(provider)
1020}
1021
1022fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1023 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1024 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1025}
1026
1027struct BuiltinProviderSchema {
1028 provider_id: &'static str,
1029 harn_schema_name: &'static str,
1030 metadata: ProviderMetadata,
1031 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1032}
1033
1034impl ProviderSchema for BuiltinProviderSchema {
1035 fn provider_id(&self) -> &'static str {
1036 self.provider_id
1037 }
1038
1039 fn harn_schema_name(&self) -> &'static str {
1040 self.harn_schema_name
1041 }
1042
1043 fn metadata(&self) -> ProviderMetadata {
1044 self.metadata.clone()
1045 }
1046
1047 fn normalize(
1048 &self,
1049 kind: &str,
1050 headers: &BTreeMap<String, String>,
1051 raw: JsonValue,
1052 ) -> Result<ProviderPayload, ProviderCatalogError> {
1053 Ok((self.normalize)(kind, headers, raw))
1054 }
1055}
1056
1057fn provider_metadata_entry(
1058 provider: &str,
1059 kinds: &[&str],
1060 schema_name: &str,
1061 outbound_methods: &[&str],
1062 signature_verification: SignatureVerificationMetadata,
1063 secret_requirements: Vec<ProviderSecretRequirement>,
1064 runtime: ProviderRuntimeMetadata,
1065) -> ProviderMetadata {
1066 ProviderMetadata {
1067 provider: provider.to_string(),
1068 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1069 schema_name: schema_name.to_string(),
1070 outbound_methods: outbound_methods
1071 .iter()
1072 .map(|name| ProviderOutboundMethod {
1073 name: (*name).to_string(),
1074 })
1075 .collect(),
1076 secret_requirements,
1077 signature_verification,
1078 runtime,
1079 }
1080}
1081
1082fn hmac_signature_metadata(
1083 variant: &str,
1084 signature_header: &str,
1085 timestamp_header: Option<&str>,
1086 id_header: Option<&str>,
1087 default_tolerance_secs: Option<i64>,
1088 encoding: &str,
1089) -> SignatureVerificationMetadata {
1090 SignatureVerificationMetadata::Hmac {
1091 variant: variant.to_string(),
1092 raw_body: true,
1093 signature_header: signature_header.to_string(),
1094 timestamp_header: timestamp_header.map(ToString::to_string),
1095 id_header: id_header.map(ToString::to_string),
1096 default_tolerance_secs,
1097 digest: "sha256".to_string(),
1098 encoding: encoding.to_string(),
1099 }
1100}
1101
1102fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1103 ProviderSecretRequirement {
1104 name: name.to_string(),
1105 required: true,
1106 namespace: namespace.to_string(),
1107 }
1108}
1109
1110fn outbound_method(name: &str) -> ProviderOutboundMethod {
1111 ProviderOutboundMethod {
1112 name: name.to_string(),
1113 }
1114}
1115
1116fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1117 ProviderSecretRequirement {
1118 name: name.to_string(),
1119 required: false,
1120 namespace: namespace.to_string(),
1121 }
1122}
1123
1124fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1125 vec![
1126 Arc::new(BuiltinProviderSchema {
1127 provider_id: "github",
1128 harn_schema_name: "GitHubEventPayload",
1129 metadata: provider_metadata_entry(
1130 "github",
1131 &["webhook"],
1132 "GitHubEventPayload",
1133 &[],
1134 hmac_signature_metadata(
1135 "github",
1136 "X-Hub-Signature-256",
1137 None,
1138 Some("X-GitHub-Delivery"),
1139 None,
1140 "hex",
1141 ),
1142 vec![required_secret("signing_secret", "github")],
1143 ProviderRuntimeMetadata::Builtin {
1144 connector: "webhook".to_string(),
1145 default_signature_variant: Some("github".to_string()),
1146 },
1147 ),
1148 normalize: github_payload,
1149 }),
1150 Arc::new(BuiltinProviderSchema {
1151 provider_id: "slack",
1152 harn_schema_name: "SlackEventPayload",
1153 metadata: provider_metadata_entry(
1154 "slack",
1155 &["webhook"],
1156 "SlackEventPayload",
1157 &[
1158 "post_message",
1159 "update_message",
1160 "add_reaction",
1161 "open_view",
1162 "user_info",
1163 "api_call",
1164 "upload_file",
1165 ],
1166 hmac_signature_metadata(
1167 "slack",
1168 "X-Slack-Signature",
1169 Some("X-Slack-Request-Timestamp"),
1170 None,
1171 Some(300),
1172 "hex",
1173 ),
1174 vec![required_secret("signing_secret", "slack")],
1175 ProviderRuntimeMetadata::Builtin {
1176 connector: "slack".to_string(),
1177 default_signature_variant: Some("slack".to_string()),
1178 },
1179 ),
1180 normalize: slack_payload,
1181 }),
1182 Arc::new(BuiltinProviderSchema {
1183 provider_id: "linear",
1184 harn_schema_name: "LinearEventPayload",
1185 metadata: {
1186 let mut metadata = provider_metadata_entry(
1187 "linear",
1188 &["webhook"],
1189 "LinearEventPayload",
1190 &[],
1191 hmac_signature_metadata(
1192 "linear",
1193 "Linear-Signature",
1194 None,
1195 Some("Linear-Delivery"),
1196 Some(75),
1197 "hex",
1198 ),
1199 vec![
1200 required_secret("signing_secret", "linear"),
1201 optional_secret("access_token", "linear"),
1202 ],
1203 ProviderRuntimeMetadata::Builtin {
1204 connector: "linear".to_string(),
1205 default_signature_variant: Some("linear".to_string()),
1206 },
1207 );
1208 metadata.outbound_methods = vec![
1209 ProviderOutboundMethod {
1210 name: "list_issues".to_string(),
1211 },
1212 ProviderOutboundMethod {
1213 name: "update_issue".to_string(),
1214 },
1215 ProviderOutboundMethod {
1216 name: "create_comment".to_string(),
1217 },
1218 ProviderOutboundMethod {
1219 name: "search".to_string(),
1220 },
1221 ProviderOutboundMethod {
1222 name: "graphql".to_string(),
1223 },
1224 ];
1225 metadata
1226 },
1227 normalize: linear_payload,
1228 }),
1229 Arc::new(BuiltinProviderSchema {
1230 provider_id: "notion",
1231 harn_schema_name: "NotionEventPayload",
1232 metadata: {
1233 let mut metadata = provider_metadata_entry(
1234 "notion",
1235 &["webhook", "poll"],
1236 "NotionEventPayload",
1237 &[],
1238 hmac_signature_metadata(
1239 "notion",
1240 "X-Notion-Signature",
1241 None,
1242 None,
1243 None,
1244 "hex",
1245 ),
1246 vec![required_secret("verification_token", "notion")],
1247 ProviderRuntimeMetadata::Builtin {
1248 connector: "notion".to_string(),
1249 default_signature_variant: Some("notion".to_string()),
1250 },
1251 );
1252 metadata.outbound_methods = vec![
1253 outbound_method("get_page"),
1254 outbound_method("update_page"),
1255 outbound_method("append_blocks"),
1256 outbound_method("query_database"),
1257 outbound_method("search"),
1258 outbound_method("create_comment"),
1259 outbound_method("api_call"),
1260 ];
1261 metadata
1262 },
1263 normalize: notion_payload,
1264 }),
1265 Arc::new(BuiltinProviderSchema {
1266 provider_id: "cron",
1267 harn_schema_name: "CronEventPayload",
1268 metadata: provider_metadata_entry(
1269 "cron",
1270 &["cron"],
1271 "CronEventPayload",
1272 &[],
1273 SignatureVerificationMetadata::None,
1274 Vec::new(),
1275 ProviderRuntimeMetadata::Builtin {
1276 connector: "cron".to_string(),
1277 default_signature_variant: None,
1278 },
1279 ),
1280 normalize: cron_payload,
1281 }),
1282 Arc::new(BuiltinProviderSchema {
1283 provider_id: "webhook",
1284 harn_schema_name: "GenericWebhookPayload",
1285 metadata: provider_metadata_entry(
1286 "webhook",
1287 &["webhook"],
1288 "GenericWebhookPayload",
1289 &[],
1290 hmac_signature_metadata(
1291 "standard",
1292 "webhook-signature",
1293 Some("webhook-timestamp"),
1294 Some("webhook-id"),
1295 Some(300),
1296 "base64",
1297 ),
1298 vec![required_secret("signing_secret", "webhook")],
1299 ProviderRuntimeMetadata::Builtin {
1300 connector: "webhook".to_string(),
1301 default_signature_variant: Some("standard".to_string()),
1302 },
1303 ),
1304 normalize: webhook_payload,
1305 }),
1306 Arc::new(BuiltinProviderSchema {
1307 provider_id: "a2a-push",
1308 harn_schema_name: "A2aPushPayload",
1309 metadata: provider_metadata_entry(
1310 "a2a-push",
1311 &["a2a-push"],
1312 "A2aPushPayload",
1313 &[],
1314 SignatureVerificationMetadata::None,
1315 Vec::new(),
1316 ProviderRuntimeMetadata::Builtin {
1317 connector: "a2a-push".to_string(),
1318 default_signature_variant: None,
1319 },
1320 ),
1321 normalize: a2a_push_payload,
1322 }),
1323 Arc::new(stream_provider_schema("kafka", kafka_payload)),
1324 Arc::new(stream_provider_schema("nats", nats_payload)),
1325 Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1326 Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1327 Arc::new(stream_provider_schema("email", email_payload)),
1328 Arc::new(stream_provider_schema("websocket", websocket_payload)),
1329 ]
1330}
1331
1332fn stream_provider_schema(
1333 provider_id: &'static str,
1334 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1335) -> BuiltinProviderSchema {
1336 BuiltinProviderSchema {
1337 provider_id,
1338 harn_schema_name: "StreamEventPayload",
1339 metadata: provider_metadata_entry(
1340 provider_id,
1341 &["stream"],
1342 "StreamEventPayload",
1343 &[],
1344 SignatureVerificationMetadata::None,
1345 Vec::new(),
1346 ProviderRuntimeMetadata::Builtin {
1347 connector: "stream".to_string(),
1348 default_signature_variant: None,
1349 },
1350 ),
1351 normalize,
1352 }
1353}
1354
1355fn github_payload(
1356 kind: &str,
1357 headers: &BTreeMap<String, String>,
1358 raw: JsonValue,
1359) -> ProviderPayload {
1360 let common = GitHubEventCommon {
1361 event: kind.to_string(),
1362 action: raw
1363 .get("action")
1364 .and_then(JsonValue::as_str)
1365 .map(ToString::to_string),
1366 delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1367 installation_id: raw
1368 .get("installation")
1369 .and_then(|value| value.get("id"))
1370 .and_then(JsonValue::as_i64),
1371 raw: raw.clone(),
1372 };
1373 let payload = match kind {
1374 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1375 common,
1376 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1377 }),
1378 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1379 common,
1380 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1381 }),
1382 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1383 common,
1384 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1385 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1386 }),
1387 "pull_request_review" => {
1388 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1389 common,
1390 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1391 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1392 })
1393 }
1394 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1395 common,
1396 commits: raw
1397 .get("commits")
1398 .and_then(JsonValue::as_array)
1399 .cloned()
1400 .unwrap_or_default(),
1401 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1402 }),
1403 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1404 common,
1405 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1406 }),
1407 "deployment_status" => {
1408 GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1409 common,
1410 deployment_status: raw
1411 .get("deployment_status")
1412 .cloned()
1413 .unwrap_or(JsonValue::Null),
1414 deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1415 })
1416 }
1417 "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1418 common,
1419 check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1420 }),
1421 _ => GitHubEventPayload::Other(common),
1422 };
1423 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1424}
1425
1426fn slack_payload(
1427 kind: &str,
1428 _headers: &BTreeMap<String, String>,
1429 raw: JsonValue,
1430) -> ProviderPayload {
1431 let event = raw.get("event");
1432 let common = SlackEventCommon {
1433 event: kind.to_string(),
1434 event_id: raw
1435 .get("event_id")
1436 .and_then(JsonValue::as_str)
1437 .map(ToString::to_string),
1438 api_app_id: raw
1439 .get("api_app_id")
1440 .and_then(JsonValue::as_str)
1441 .map(ToString::to_string),
1442 team_id: raw
1443 .get("team_id")
1444 .and_then(JsonValue::as_str)
1445 .map(ToString::to_string),
1446 channel_id: slack_channel_id(event),
1447 user_id: slack_user_id(event),
1448 event_ts: event
1449 .and_then(|value| value.get("event_ts"))
1450 .and_then(JsonValue::as_str)
1451 .map(ToString::to_string),
1452 raw: raw.clone(),
1453 };
1454 let payload = match kind {
1455 kind if kind == "message" || kind.starts_with("message.") => {
1456 SlackEventPayload::Message(SlackMessageEventPayload {
1457 subtype: event
1458 .and_then(|value| value.get("subtype"))
1459 .and_then(JsonValue::as_str)
1460 .map(ToString::to_string),
1461 channel_type: event
1462 .and_then(|value| value.get("channel_type"))
1463 .and_then(JsonValue::as_str)
1464 .map(ToString::to_string),
1465 channel: event
1466 .and_then(|value| value.get("channel"))
1467 .and_then(JsonValue::as_str)
1468 .map(ToString::to_string),
1469 user: event
1470 .and_then(|value| value.get("user"))
1471 .and_then(JsonValue::as_str)
1472 .map(ToString::to_string),
1473 text: event
1474 .and_then(|value| value.get("text"))
1475 .and_then(JsonValue::as_str)
1476 .map(ToString::to_string),
1477 ts: event
1478 .and_then(|value| value.get("ts"))
1479 .and_then(JsonValue::as_str)
1480 .map(ToString::to_string),
1481 thread_ts: event
1482 .and_then(|value| value.get("thread_ts"))
1483 .and_then(JsonValue::as_str)
1484 .map(ToString::to_string),
1485 common,
1486 })
1487 }
1488 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1489 channel: event
1490 .and_then(|value| value.get("channel"))
1491 .and_then(JsonValue::as_str)
1492 .map(ToString::to_string),
1493 user: event
1494 .and_then(|value| value.get("user"))
1495 .and_then(JsonValue::as_str)
1496 .map(ToString::to_string),
1497 text: event
1498 .and_then(|value| value.get("text"))
1499 .and_then(JsonValue::as_str)
1500 .map(ToString::to_string),
1501 ts: event
1502 .and_then(|value| value.get("ts"))
1503 .and_then(JsonValue::as_str)
1504 .map(ToString::to_string),
1505 thread_ts: event
1506 .and_then(|value| value.get("thread_ts"))
1507 .and_then(JsonValue::as_str)
1508 .map(ToString::to_string),
1509 common,
1510 }),
1511 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1512 reaction: event
1513 .and_then(|value| value.get("reaction"))
1514 .and_then(JsonValue::as_str)
1515 .map(ToString::to_string),
1516 item_user: event
1517 .and_then(|value| value.get("item_user"))
1518 .and_then(JsonValue::as_str)
1519 .map(ToString::to_string),
1520 item: event
1521 .and_then(|value| value.get("item"))
1522 .cloned()
1523 .unwrap_or(JsonValue::Null),
1524 common,
1525 }),
1526 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1527 user: event
1528 .and_then(|value| value.get("user"))
1529 .and_then(JsonValue::as_str)
1530 .map(ToString::to_string),
1531 channel: event
1532 .and_then(|value| value.get("channel"))
1533 .and_then(JsonValue::as_str)
1534 .map(ToString::to_string),
1535 tab: event
1536 .and_then(|value| value.get("tab"))
1537 .and_then(JsonValue::as_str)
1538 .map(ToString::to_string),
1539 view: event
1540 .and_then(|value| value.get("view"))
1541 .cloned()
1542 .unwrap_or(JsonValue::Null),
1543 common,
1544 }),
1545 "assistant_thread_started" => {
1546 let assistant_thread = event
1547 .and_then(|value| value.get("assistant_thread"))
1548 .cloned()
1549 .unwrap_or(JsonValue::Null);
1550 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1551 thread_ts: assistant_thread
1552 .get("thread_ts")
1553 .and_then(JsonValue::as_str)
1554 .map(ToString::to_string),
1555 context: assistant_thread
1556 .get("context")
1557 .cloned()
1558 .unwrap_or(JsonValue::Null),
1559 assistant_thread,
1560 common,
1561 })
1562 }
1563 _ => SlackEventPayload::Other(common),
1564 };
1565 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1566}
1567
1568fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1569 event
1570 .and_then(|value| value.get("channel"))
1571 .and_then(JsonValue::as_str)
1572 .map(ToString::to_string)
1573 .or_else(|| {
1574 event
1575 .and_then(|value| value.get("item"))
1576 .and_then(|value| value.get("channel"))
1577 .and_then(JsonValue::as_str)
1578 .map(ToString::to_string)
1579 })
1580 .or_else(|| {
1581 event
1582 .and_then(|value| value.get("channel"))
1583 .and_then(|value| value.get("id"))
1584 .and_then(JsonValue::as_str)
1585 .map(ToString::to_string)
1586 })
1587 .or_else(|| {
1588 event
1589 .and_then(|value| value.get("assistant_thread"))
1590 .and_then(|value| value.get("channel_id"))
1591 .and_then(JsonValue::as_str)
1592 .map(ToString::to_string)
1593 })
1594}
1595
1596fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1597 event
1598 .and_then(|value| value.get("user"))
1599 .and_then(JsonValue::as_str)
1600 .map(ToString::to_string)
1601 .or_else(|| {
1602 event
1603 .and_then(|value| value.get("user"))
1604 .and_then(|value| value.get("id"))
1605 .and_then(JsonValue::as_str)
1606 .map(ToString::to_string)
1607 })
1608 .or_else(|| {
1609 event
1610 .and_then(|value| value.get("item_user"))
1611 .and_then(JsonValue::as_str)
1612 .map(ToString::to_string)
1613 })
1614 .or_else(|| {
1615 event
1616 .and_then(|value| value.get("assistant_thread"))
1617 .and_then(|value| value.get("user_id"))
1618 .and_then(JsonValue::as_str)
1619 .map(ToString::to_string)
1620 })
1621}
1622
1623fn linear_payload(
1624 _kind: &str,
1625 headers: &BTreeMap<String, String>,
1626 raw: JsonValue,
1627) -> ProviderPayload {
1628 let common = linear_event_common(headers, &raw);
1629 let event = common.event.clone();
1630 let payload = match event.as_str() {
1631 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1632 common,
1633 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1634 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1635 }),
1636 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1637 common,
1638 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1639 }),
1640 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1641 common,
1642 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1643 }),
1644 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1645 common,
1646 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1647 }),
1648 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1649 common,
1650 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1651 }),
1652 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1653 common,
1654 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1655 }),
1656 "customer_request" => {
1657 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1658 common,
1659 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1660 })
1661 }
1662 _ => LinearEventPayload::Other(common),
1663 };
1664 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1665}
1666
1667fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1668 LinearEventCommon {
1669 event: linear_event_name(
1670 raw.get("type")
1671 .and_then(JsonValue::as_str)
1672 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1673 ),
1674 action: raw
1675 .get("action")
1676 .and_then(JsonValue::as_str)
1677 .map(ToString::to_string),
1678 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1679 organization_id: raw
1680 .get("organizationId")
1681 .and_then(JsonValue::as_str)
1682 .map(ToString::to_string),
1683 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1684 webhook_id: raw
1685 .get("webhookId")
1686 .and_then(JsonValue::as_str)
1687 .map(ToString::to_string),
1688 url: raw
1689 .get("url")
1690 .and_then(JsonValue::as_str)
1691 .map(ToString::to_string),
1692 created_at: raw
1693 .get("createdAt")
1694 .and_then(JsonValue::as_str)
1695 .map(ToString::to_string),
1696 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1697 raw: raw.clone(),
1698 }
1699}
1700
1701fn linear_event_name(raw_type: Option<&str>) -> String {
1702 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1703 "issue" => "issue".to_string(),
1704 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1705 "issuelabel" | "issue_label" => "issue_label".to_string(),
1706 "project" | "projectupdate" | "project_update" => "project".to_string(),
1707 "cycle" => "cycle".to_string(),
1708 "customer" => "customer".to_string(),
1709 "customerrequest" | "customer_request" => "customer_request".to_string(),
1710 other if !other.is_empty() => other.to_string(),
1711 _ => "other".to_string(),
1712 }
1713}
1714
1715fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1716 let Some(JsonValue::Object(fields)) = updated_from else {
1717 return Vec::new();
1718 };
1719 let mut changes = Vec::new();
1720 for (field, previous) in fields {
1721 let change = match field.as_str() {
1722 "title" => LinearIssueChange::Title {
1723 previous: previous.as_str().map(ToString::to_string),
1724 },
1725 "description" => LinearIssueChange::Description {
1726 previous: previous.as_str().map(ToString::to_string),
1727 },
1728 "priority" => LinearIssueChange::Priority {
1729 previous: parse_json_i64ish(previous),
1730 },
1731 "estimate" => LinearIssueChange::Estimate {
1732 previous: parse_json_i64ish(previous),
1733 },
1734 "stateId" => LinearIssueChange::StateId {
1735 previous: previous.as_str().map(ToString::to_string),
1736 },
1737 "teamId" => LinearIssueChange::TeamId {
1738 previous: previous.as_str().map(ToString::to_string),
1739 },
1740 "assigneeId" => LinearIssueChange::AssigneeId {
1741 previous: previous.as_str().map(ToString::to_string),
1742 },
1743 "projectId" => LinearIssueChange::ProjectId {
1744 previous: previous.as_str().map(ToString::to_string),
1745 },
1746 "cycleId" => LinearIssueChange::CycleId {
1747 previous: previous.as_str().map(ToString::to_string),
1748 },
1749 "dueDate" => LinearIssueChange::DueDate {
1750 previous: previous.as_str().map(ToString::to_string),
1751 },
1752 "parentId" => LinearIssueChange::ParentId {
1753 previous: previous.as_str().map(ToString::to_string),
1754 },
1755 "sortOrder" => LinearIssueChange::SortOrder {
1756 previous: previous.as_f64(),
1757 },
1758 "labelIds" => LinearIssueChange::LabelIds {
1759 previous: parse_string_array(previous),
1760 },
1761 "completedAt" => LinearIssueChange::CompletedAt {
1762 previous: previous.as_str().map(ToString::to_string),
1763 },
1764 _ => LinearIssueChange::Other {
1765 field: field.clone(),
1766 previous: previous.clone(),
1767 },
1768 };
1769 changes.push(change);
1770 }
1771 changes
1772}
1773
1774fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1775 value
1776 .as_i64()
1777 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1778 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1779}
1780
1781fn parse_string_array(value: &JsonValue) -> Vec<String> {
1782 let Some(array) = value.as_array() else {
1783 return Vec::new();
1784 };
1785 array
1786 .iter()
1787 .filter_map(|entry| {
1788 entry.as_str().map(ToString::to_string).or_else(|| {
1789 entry
1790 .get("id")
1791 .and_then(JsonValue::as_str)
1792 .map(ToString::to_string)
1793 })
1794 })
1795 .collect()
1796}
1797
1798fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1799 headers
1800 .iter()
1801 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1802 .map(|(_, value)| value.as_str())
1803}
1804
1805fn notion_payload(
1806 kind: &str,
1807 headers: &BTreeMap<String, String>,
1808 raw: JsonValue,
1809) -> ProviderPayload {
1810 let workspace_id = raw
1811 .get("workspace_id")
1812 .and_then(JsonValue::as_str)
1813 .map(ToString::to_string);
1814 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1815 event: kind.to_string(),
1816 workspace_id,
1817 request_id: headers
1818 .get("request-id")
1819 .cloned()
1820 .or_else(|| headers.get("x-request-id").cloned()),
1821 subscription_id: raw
1822 .get("subscription_id")
1823 .and_then(JsonValue::as_str)
1824 .map(ToString::to_string),
1825 integration_id: raw
1826 .get("integration_id")
1827 .and_then(JsonValue::as_str)
1828 .map(ToString::to_string),
1829 attempt_number: raw
1830 .get("attempt_number")
1831 .and_then(JsonValue::as_u64)
1832 .and_then(|value| u32::try_from(value).ok()),
1833 entity_id: raw
1834 .get("entity")
1835 .and_then(|value| value.get("id"))
1836 .and_then(JsonValue::as_str)
1837 .map(ToString::to_string),
1838 entity_type: raw
1839 .get("entity")
1840 .and_then(|value| value.get("type"))
1841 .and_then(JsonValue::as_str)
1842 .map(ToString::to_string),
1843 api_version: raw
1844 .get("api_version")
1845 .and_then(JsonValue::as_str)
1846 .map(ToString::to_string),
1847 verification_token: raw
1848 .get("verification_token")
1849 .and_then(JsonValue::as_str)
1850 .map(ToString::to_string),
1851 polled: None,
1852 raw,
1853 })))
1854}
1855
1856fn cron_payload(
1857 _kind: &str,
1858 _headers: &BTreeMap<String, String>,
1859 raw: JsonValue,
1860) -> ProviderPayload {
1861 let cron_id = raw
1862 .get("cron_id")
1863 .and_then(JsonValue::as_str)
1864 .map(ToString::to_string);
1865 let schedule = raw
1866 .get("schedule")
1867 .and_then(JsonValue::as_str)
1868 .map(ToString::to_string);
1869 let tick_at = raw
1870 .get("tick_at")
1871 .and_then(JsonValue::as_str)
1872 .and_then(parse_rfc3339)
1873 .unwrap_or_else(OffsetDateTime::now_utc);
1874 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1875 cron_id,
1876 schedule,
1877 tick_at,
1878 raw,
1879 }))
1880}
1881
1882fn webhook_payload(
1883 _kind: &str,
1884 headers: &BTreeMap<String, String>,
1885 raw: JsonValue,
1886) -> ProviderPayload {
1887 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1888 source: headers.get("X-Webhook-Source").cloned(),
1889 content_type: headers.get("Content-Type").cloned(),
1890 raw,
1891 }))
1892}
1893
1894fn a2a_push_payload(
1895 _kind: &str,
1896 _headers: &BTreeMap<String, String>,
1897 raw: JsonValue,
1898) -> ProviderPayload {
1899 let task_id = raw
1900 .get("task_id")
1901 .and_then(JsonValue::as_str)
1902 .map(ToString::to_string);
1903 let sender = raw
1904 .get("sender")
1905 .and_then(JsonValue::as_str)
1906 .map(ToString::to_string);
1907 let task_state = raw
1908 .pointer("/status/state")
1909 .or_else(|| raw.pointer("/statusUpdate/status/state"))
1910 .and_then(JsonValue::as_str)
1911 .map(|state| match state {
1912 "cancelled" => "canceled".to_string(),
1913 other => other.to_string(),
1914 });
1915 let artifact = raw
1916 .pointer("/artifactUpdate/artifact")
1917 .or_else(|| raw.get("artifact"))
1918 .cloned();
1919 let kind = task_state
1920 .as_deref()
1921 .map(|state| format!("a2a.task.{state}"))
1922 .unwrap_or_else(|| "a2a.task.update".to_string());
1923 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1924 task_id,
1925 task_state,
1926 artifact,
1927 sender,
1928 raw,
1929 kind,
1930 }))
1931}
1932
1933fn kafka_payload(
1934 kind: &str,
1935 headers: &BTreeMap<String, String>,
1936 raw: JsonValue,
1937) -> ProviderPayload {
1938 ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1939 kind, headers, raw,
1940 )))
1941}
1942
1943fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1944 ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1945 kind, headers, raw,
1946 )))
1947}
1948
1949fn pulsar_payload(
1950 kind: &str,
1951 headers: &BTreeMap<String, String>,
1952 raw: JsonValue,
1953) -> ProviderPayload {
1954 ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1955 kind, headers, raw,
1956 )))
1957}
1958
1959fn postgres_cdc_payload(
1960 kind: &str,
1961 headers: &BTreeMap<String, String>,
1962 raw: JsonValue,
1963) -> ProviderPayload {
1964 ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1965 kind, headers, raw,
1966 )))
1967}
1968
1969fn email_payload(
1970 kind: &str,
1971 headers: &BTreeMap<String, String>,
1972 raw: JsonValue,
1973) -> ProviderPayload {
1974 ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1975 kind, headers, raw,
1976 )))
1977}
1978
1979fn websocket_payload(
1980 kind: &str,
1981 headers: &BTreeMap<String, String>,
1982 raw: JsonValue,
1983) -> ProviderPayload {
1984 ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
1985 kind, headers, raw,
1986 )))
1987}
1988
1989fn stream_payload(
1990 kind: &str,
1991 headers: &BTreeMap<String, String>,
1992 raw: JsonValue,
1993) -> StreamEventPayload {
1994 StreamEventPayload {
1995 event: kind.to_string(),
1996 source: json_stringish(&raw, &["source", "connector", "origin"]),
1997 stream: json_stringish(
1998 &raw,
1999 &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2000 ),
2001 partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2002 offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2003 key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2004 timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2005 headers: headers.clone(),
2006 raw,
2007 }
2008}
2009
2010fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2011 fields.iter().find_map(|field| {
2012 let value = raw.get(*field)?;
2013 value
2014 .as_str()
2015 .map(ToString::to_string)
2016 .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2017 .or_else(|| value.as_u64().map(|number| number.to_string()))
2018 })
2019}
2020
2021fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2022 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2023}
2024
2025#[cfg(test)]
2026mod tests {
2027 use super::*;
2028
2029 fn sample_headers() -> BTreeMap<String, String> {
2030 BTreeMap::from([
2031 ("Authorization".to_string(), "Bearer secret".to_string()),
2032 ("Cookie".to_string(), "session=abc".to_string()),
2033 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2034 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2035 ("X-GitHub-Event".to_string(), "issues".to_string()),
2036 ("X-Webhook-Token".to_string(), "token".to_string()),
2037 ])
2038 }
2039
2040 #[test]
2041 fn default_redaction_policy_keeps_safe_headers() {
2042 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2043 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2044 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2045 assert_eq!(
2046 redacted.get("Authorization").unwrap(),
2047 REDACTED_HEADER_VALUE
2048 );
2049 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2050 assert_eq!(
2051 redacted.get("X-Webhook-Token").unwrap(),
2052 REDACTED_HEADER_VALUE
2053 );
2054 }
2055
2056 #[test]
2057 fn provider_catalog_rejects_duplicates() {
2058 let mut catalog = ProviderCatalog::default();
2059 catalog
2060 .register(Arc::new(BuiltinProviderSchema {
2061 provider_id: "github",
2062 harn_schema_name: "GitHubEventPayload",
2063 metadata: provider_metadata_entry(
2064 "github",
2065 &["webhook"],
2066 "GitHubEventPayload",
2067 &[],
2068 SignatureVerificationMetadata::None,
2069 Vec::new(),
2070 ProviderRuntimeMetadata::Placeholder,
2071 ),
2072 normalize: github_payload,
2073 }))
2074 .unwrap();
2075 let error = catalog
2076 .register(Arc::new(BuiltinProviderSchema {
2077 provider_id: "github",
2078 harn_schema_name: "GitHubEventPayload",
2079 metadata: provider_metadata_entry(
2080 "github",
2081 &["webhook"],
2082 "GitHubEventPayload",
2083 &[],
2084 SignatureVerificationMetadata::None,
2085 Vec::new(),
2086 ProviderRuntimeMetadata::Placeholder,
2087 ),
2088 normalize: github_payload,
2089 }))
2090 .unwrap_err();
2091 assert_eq!(
2092 error,
2093 ProviderCatalogError::DuplicateProvider("github".to_string())
2094 );
2095 }
2096
2097 #[test]
2098 fn registered_provider_metadata_marks_builtin_connectors() {
2099 let entries = registered_provider_metadata();
2100 let builtin: Vec<&ProviderMetadata> = entries
2101 .iter()
2102 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2103 .collect();
2104
2105 assert_eq!(builtin.len(), 13);
2106 assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2107 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2108 assert!(builtin.iter().any(|entry| entry.provider == "github"));
2109 assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2110 assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2111 assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2112 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2113 let kafka = entries
2114 .iter()
2115 .find(|entry| entry.provider == "kafka")
2116 .expect("kafka stream provider");
2117 assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2118 assert_eq!(kafka.schema_name, "StreamEventPayload");
2119 assert!(matches!(
2120 kafka.runtime,
2121 ProviderRuntimeMetadata::Builtin {
2122 ref connector,
2123 default_signature_variant: None
2124 } if connector == "stream"
2125 ));
2126 }
2127
2128 #[test]
2129 fn trigger_event_round_trip_is_stable() {
2130 let provider = ProviderId::from("github");
2131 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2132 let payload = ProviderPayload::normalize(
2133 &provider,
2134 "issues",
2135 &sample_headers(),
2136 serde_json::json!({
2137 "action": "opened",
2138 "installation": {"id": 42},
2139 "issue": {"number": 99}
2140 }),
2141 )
2142 .unwrap();
2143 let event = TriggerEvent {
2144 id: TriggerEventId("trigger_evt_fixed".to_string()),
2145 provider,
2146 kind: "issues".to_string(),
2147 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2148 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2149 dedupe_key: "delivery-123".to_string(),
2150 trace_id: TraceId("trace_fixed".to_string()),
2151 tenant_id: Some(TenantId("tenant_1".to_string())),
2152 headers,
2153 provider_payload: payload,
2154 signature_status: SignatureStatus::Verified,
2155 dedupe_claimed: false,
2156 batch: None,
2157 raw_body: Some(vec![0, 159, 255, 10]),
2158 };
2159
2160 let once = serde_json::to_value(&event).unwrap();
2161 assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2162 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2163 let twice = serde_json::to_value(&decoded).unwrap();
2164 assert_eq!(decoded, event);
2165 assert_eq!(once, twice);
2166 }
2167
2168 #[test]
2169 fn unknown_provider_errors() {
2170 let error = ProviderPayload::normalize(
2171 &ProviderId::from("custom-provider"),
2172 "thing.happened",
2173 &BTreeMap::new(),
2174 serde_json::json!({"ok": true}),
2175 )
2176 .unwrap_err();
2177 assert_eq!(
2178 error,
2179 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2180 );
2181 }
2182
2183 #[test]
2184 fn provider_normalizes_stream_payloads() {
2185 let payload = ProviderPayload::normalize(
2186 &ProviderId::from("kafka"),
2187 "quote.tick",
2188 &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2189 serde_json::json!({
2190 "topic": "quotes",
2191 "partition": 7,
2192 "offset": "42",
2193 "key": "AAPL",
2194 "timestamp": "2026-04-21T12:00:00Z"
2195 }),
2196 )
2197 .expect("stream payload");
2198 let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2199 panic!("expected kafka stream payload")
2200 };
2201 assert_eq!(payload.event, "quote.tick");
2202 assert_eq!(payload.stream.as_deref(), Some("quotes"));
2203 assert_eq!(payload.partition.as_deref(), Some("7"));
2204 assert_eq!(payload.offset.as_deref(), Some("42"));
2205 assert_eq!(payload.key.as_deref(), Some("AAPL"));
2206 assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2207 }
2208}