1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14 value: &Option<Vec<u8>>,
15 serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18 S: Serializer,
19{
20 use base64::Engine;
21
22 match value {
23 Some(bytes) => {
24 serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25 }
26 None => serializer.serialize_none(),
27 }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32 D: Deserializer<'de>,
33{
34 use base64::Engine;
35
36 let encoded = Option::<String>::deserialize(deserializer)?;
37 encoded
38 .map(|text| {
39 base64::engine::general_purpose::STANDARD
40 .decode(text.as_bytes())
41 .map_err(serde::de::Error::custom)
42 })
43 .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51 pub fn new() -> Self {
52 Self(format!("trigger_evt_{}", Uuid::now_v7()))
53 }
54}
55
56impl Default for TriggerEventId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67 pub fn new(value: impl Into<String>) -> Self {
68 Self(value.into())
69 }
70
71 pub fn as_str(&self) -> &str {
72 self.0.as_str()
73 }
74}
75
76impl From<&str> for ProviderId {
77 fn from(value: &str) -> Self {
78 Self::new(value)
79 }
80}
81
82impl From<String> for ProviderId {
83 fn from(value: String) -> Self {
84 Self::new(value)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93 pub fn new() -> Self {
94 Self(format!("trace_{}", Uuid::now_v7()))
95 }
96}
97
98impl Default for TraceId {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117 Verified,
118 Unsigned,
119 Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124 pub event: String,
125 pub action: Option<String>,
126 pub delivery_id: Option<String>,
127 pub installation_id: Option<i64>,
128 pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133 #[serde(flatten)]
134 pub common: GitHubEventCommon,
135 pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140 #[serde(flatten)]
141 pub common: GitHubEventCommon,
142 pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147 #[serde(flatten)]
148 pub common: GitHubEventCommon,
149 pub issue: JsonValue,
150 pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155 #[serde(flatten)]
156 pub common: GitHubEventCommon,
157 pub pull_request: JsonValue,
158 pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163 #[serde(flatten)]
164 pub common: GitHubEventCommon,
165 #[serde(default)]
166 pub commits: Vec<JsonValue>,
167 pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172 #[serde(flatten)]
173 pub common: GitHubEventCommon,
174 pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179 #[serde(flatten)]
180 pub common: GitHubEventCommon,
181 pub deployment_status: JsonValue,
182 pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187 #[serde(flatten)]
188 pub common: GitHubEventCommon,
189 pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195 Issues(GitHubIssuesEventPayload),
196 PullRequest(GitHubPullRequestEventPayload),
197 IssueComment(GitHubIssueCommentEventPayload),
198 PullRequestReview(GitHubPullRequestReviewEventPayload),
199 Push(GitHubPushEventPayload),
200 WorkflowRun(GitHubWorkflowRunEventPayload),
201 DeploymentStatus(GitHubDeploymentStatusEventPayload),
202 CheckRun(GitHubCheckRunEventPayload),
203 Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208 pub event: String,
209 pub event_id: Option<String>,
210 pub api_app_id: Option<String>,
211 pub team_id: Option<String>,
212 pub channel_id: Option<String>,
213 pub user_id: Option<String>,
214 pub event_ts: Option<String>,
215 pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220 #[serde(flatten)]
221 pub common: SlackEventCommon,
222 pub subtype: Option<String>,
223 pub channel_type: Option<String>,
224 pub channel: Option<String>,
225 pub user: Option<String>,
226 pub text: Option<String>,
227 pub ts: Option<String>,
228 pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233 #[serde(flatten)]
234 pub common: SlackEventCommon,
235 pub channel: Option<String>,
236 pub user: Option<String>,
237 pub text: Option<String>,
238 pub ts: Option<String>,
239 pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244 #[serde(flatten)]
245 pub common: SlackEventCommon,
246 pub reaction: Option<String>,
247 pub item_user: Option<String>,
248 pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253 #[serde(flatten)]
254 pub common: SlackEventCommon,
255 pub user: Option<String>,
256 pub channel: Option<String>,
257 pub tab: Option<String>,
258 pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263 #[serde(flatten)]
264 pub common: SlackEventCommon,
265 pub assistant_thread: JsonValue,
266 pub thread_ts: Option<String>,
267 pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273 Message(SlackMessageEventPayload),
274 AppMention(SlackAppMentionEventPayload),
275 ReactionAdded(SlackReactionAddedEventPayload),
276 AppHomeOpened(SlackAppHomeOpenedEventPayload),
277 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278 Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283 pub event: String,
284 pub action: Option<String>,
285 pub delivery_id: Option<String>,
286 pub organization_id: Option<String>,
287 pub webhook_timestamp: Option<i64>,
288 pub webhook_id: Option<String>,
289 pub url: Option<String>,
290 pub created_at: Option<String>,
291 pub actor: JsonValue,
292 pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297 Title { previous: Option<String> },
298 Description { previous: Option<String> },
299 Priority { previous: Option<i64> },
300 Estimate { previous: Option<i64> },
301 StateId { previous: Option<String> },
302 TeamId { previous: Option<String> },
303 AssigneeId { previous: Option<String> },
304 ProjectId { previous: Option<String> },
305 CycleId { previous: Option<String> },
306 DueDate { previous: Option<String> },
307 ParentId { previous: Option<String> },
308 SortOrder { previous: Option<f64> },
309 LabelIds { previous: Vec<String> },
310 CompletedAt { previous: Option<String> },
311 Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316 where
317 S: Serializer,
318 {
319 let value = match self {
320 Self::Title { previous } => {
321 serde_json::json!({ "field_name": "title", "previous": previous })
322 }
323 Self::Description { previous } => {
324 serde_json::json!({ "field_name": "description", "previous": previous })
325 }
326 Self::Priority { previous } => {
327 serde_json::json!({ "field_name": "priority", "previous": previous })
328 }
329 Self::Estimate { previous } => {
330 serde_json::json!({ "field_name": "estimate", "previous": previous })
331 }
332 Self::StateId { previous } => {
333 serde_json::json!({ "field_name": "state_id", "previous": previous })
334 }
335 Self::TeamId { previous } => {
336 serde_json::json!({ "field_name": "team_id", "previous": previous })
337 }
338 Self::AssigneeId { previous } => {
339 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340 }
341 Self::ProjectId { previous } => {
342 serde_json::json!({ "field_name": "project_id", "previous": previous })
343 }
344 Self::CycleId { previous } => {
345 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346 }
347 Self::DueDate { previous } => {
348 serde_json::json!({ "field_name": "due_date", "previous": previous })
349 }
350 Self::ParentId { previous } => {
351 serde_json::json!({ "field_name": "parent_id", "previous": previous })
352 }
353 Self::SortOrder { previous } => {
354 serde_json::json!({ "field_name": "sort_order", "previous": previous })
355 }
356 Self::LabelIds { previous } => {
357 serde_json::json!({ "field_name": "label_ids", "previous": previous })
358 }
359 Self::CompletedAt { previous } => {
360 serde_json::json!({ "field_name": "completed_at", "previous": previous })
361 }
362 Self::Other { field, previous } => {
363 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364 }
365 };
366 value.serialize(serializer)
367 }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372 where
373 D: Deserializer<'de>,
374 {
375 let value = JsonValue::deserialize(deserializer)?;
376 let field_name = value
377 .get("field_name")
378 .and_then(JsonValue::as_str)
379 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381 Ok(match field_name {
382 "title" => Self::Title {
383 previous: previous.as_str().map(ToString::to_string),
384 },
385 "description" => Self::Description {
386 previous: previous.as_str().map(ToString::to_string),
387 },
388 "priority" => Self::Priority {
389 previous: parse_json_i64ish(&previous),
390 },
391 "estimate" => Self::Estimate {
392 previous: parse_json_i64ish(&previous),
393 },
394 "state_id" => Self::StateId {
395 previous: previous.as_str().map(ToString::to_string),
396 },
397 "team_id" => Self::TeamId {
398 previous: previous.as_str().map(ToString::to_string),
399 },
400 "assignee_id" => Self::AssigneeId {
401 previous: previous.as_str().map(ToString::to_string),
402 },
403 "project_id" => Self::ProjectId {
404 previous: previous.as_str().map(ToString::to_string),
405 },
406 "cycle_id" => Self::CycleId {
407 previous: previous.as_str().map(ToString::to_string),
408 },
409 "due_date" => Self::DueDate {
410 previous: previous.as_str().map(ToString::to_string),
411 },
412 "parent_id" => Self::ParentId {
413 previous: previous.as_str().map(ToString::to_string),
414 },
415 "sort_order" => Self::SortOrder {
416 previous: previous.as_f64(),
417 },
418 "label_ids" => Self::LabelIds {
419 previous: parse_string_array(&previous),
420 },
421 "completed_at" => Self::CompletedAt {
422 previous: previous.as_str().map(ToString::to_string),
423 },
424 "other" => Self::Other {
425 field: value
426 .get("field")
427 .and_then(JsonValue::as_str)
428 .map(ToString::to_string)
429 .unwrap_or_else(|| "unknown".to_string()),
430 previous,
431 },
432 other => Self::Other {
433 field: other.to_string(),
434 previous,
435 },
436 })
437 }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442 #[serde(flatten)]
443 pub common: LinearEventCommon,
444 pub issue: JsonValue,
445 #[serde(default)]
446 pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451 #[serde(flatten)]
452 pub common: LinearEventCommon,
453 pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458 #[serde(flatten)]
459 pub common: LinearEventCommon,
460 pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465 #[serde(flatten)]
466 pub common: LinearEventCommon,
467 pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472 #[serde(flatten)]
473 pub common: LinearEventCommon,
474 pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479 #[serde(flatten)]
480 pub common: LinearEventCommon,
481 pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486 #[serde(flatten)]
487 pub common: LinearEventCommon,
488 pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494 Issue(LinearIssueEventPayload),
495 IssueComment(LinearIssueCommentEventPayload),
496 IssueLabel(LinearIssueLabelEventPayload),
497 Project(LinearProjectEventPayload),
498 Cycle(LinearCycleEventPayload),
499 Customer(LinearCustomerEventPayload),
500 CustomerRequest(LinearCustomerRequestEventPayload),
501 Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506 pub resource: String,
507 pub source_id: String,
508 pub entity_id: String,
509 pub high_water_mark: String,
510 pub before: Option<JsonValue>,
511 pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516 pub event: String,
517 pub workspace_id: Option<String>,
518 pub request_id: Option<String>,
519 pub subscription_id: Option<String>,
520 pub integration_id: Option<String>,
521 pub attempt_number: Option<u32>,
522 pub entity_id: Option<String>,
523 pub entity_type: Option<String>,
524 pub api_version: Option<String>,
525 pub verification_token: Option<String>,
526 pub polled: Option<NotionPolledChangeEvent>,
527 pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532 pub cron_id: Option<String>,
533 pub schedule: Option<String>,
534 #[serde(with = "time::serde::rfc3339")]
535 pub tick_at: OffsetDateTime,
536 pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541 pub source: Option<String>,
542 pub content_type: Option<String>,
543 pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548 pub task_id: Option<String>,
549 pub sender: Option<String>,
550 pub raw: JsonValue,
551}
552
553#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
554pub struct StreamEventPayload {
555 pub event: String,
556 pub source: Option<String>,
557 pub stream: Option<String>,
558 pub partition: Option<String>,
559 pub offset: Option<String>,
560 pub key: Option<String>,
561 pub timestamp: Option<String>,
562 #[serde(default)]
563 pub headers: BTreeMap<String, String>,
564 pub raw: JsonValue,
565}
566
567#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
568pub struct ExtensionProviderPayload {
569 pub provider: String,
570 pub schema_name: String,
571 pub raw: JsonValue,
572}
573
574#[allow(clippy::large_enum_variant)]
575#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
576#[serde(untagged)]
577pub enum ProviderPayload {
578 Known(KnownProviderPayload),
579 Extension(ExtensionProviderPayload),
580}
581
582impl ProviderPayload {
583 pub fn provider(&self) -> &str {
584 match self {
585 Self::Known(known) => known.provider(),
586 Self::Extension(payload) => payload.provider.as_str(),
587 }
588 }
589
590 pub fn normalize(
591 provider: &ProviderId,
592 kind: &str,
593 headers: &BTreeMap<String, String>,
594 raw: JsonValue,
595 ) -> Result<Self, ProviderCatalogError> {
596 provider_catalog()
597 .read()
598 .expect("provider catalog poisoned")
599 .normalize(provider, kind, headers, raw)
600 }
601}
602
603#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
604#[serde(tag = "provider")]
605pub enum KnownProviderPayload {
606 #[serde(rename = "github")]
607 GitHub(GitHubEventPayload),
608 #[serde(rename = "slack")]
609 Slack(Box<SlackEventPayload>),
610 #[serde(rename = "linear")]
611 Linear(LinearEventPayload),
612 #[serde(rename = "notion")]
613 Notion(Box<NotionEventPayload>),
614 #[serde(rename = "cron")]
615 Cron(CronEventPayload),
616 #[serde(rename = "webhook")]
617 Webhook(GenericWebhookPayload),
618 #[serde(rename = "a2a-push")]
619 A2aPush(A2aPushPayload),
620 #[serde(rename = "kafka")]
621 Kafka(StreamEventPayload),
622 #[serde(rename = "nats")]
623 Nats(StreamEventPayload),
624 #[serde(rename = "pulsar")]
625 Pulsar(StreamEventPayload),
626 #[serde(rename = "postgres-cdc")]
627 PostgresCdc(StreamEventPayload),
628 #[serde(rename = "email")]
629 Email(StreamEventPayload),
630 #[serde(rename = "websocket")]
631 Websocket(StreamEventPayload),
632}
633
634impl KnownProviderPayload {
635 pub fn provider(&self) -> &str {
636 match self {
637 Self::GitHub(_) => "github",
638 Self::Slack(_) => "slack",
639 Self::Linear(_) => "linear",
640 Self::Notion(_) => "notion",
641 Self::Cron(_) => "cron",
642 Self::Webhook(_) => "webhook",
643 Self::A2aPush(_) => "a2a-push",
644 Self::Kafka(_) => "kafka",
645 Self::Nats(_) => "nats",
646 Self::Pulsar(_) => "pulsar",
647 Self::PostgresCdc(_) => "postgres-cdc",
648 Self::Email(_) => "email",
649 Self::Websocket(_) => "websocket",
650 }
651 }
652}
653
654#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
655pub struct TriggerEvent {
656 pub id: TriggerEventId,
657 pub provider: ProviderId,
658 pub kind: String,
659 #[serde(with = "time::serde::rfc3339")]
660 pub received_at: OffsetDateTime,
661 #[serde(with = "time::serde::rfc3339::option")]
662 pub occurred_at: Option<OffsetDateTime>,
663 pub dedupe_key: String,
664 pub trace_id: TraceId,
665 pub tenant_id: Option<TenantId>,
666 pub headers: BTreeMap<String, String>,
667 #[serde(default, skip_serializing_if = "Option::is_none")]
668 pub batch: Option<Vec<JsonValue>>,
669 #[serde(
670 default,
671 skip_serializing_if = "Option::is_none",
672 serialize_with = "serialize_optional_bytes_b64",
673 deserialize_with = "deserialize_optional_bytes_b64"
674 )]
675 pub raw_body: Option<Vec<u8>>,
676 pub provider_payload: ProviderPayload,
677 pub signature_status: SignatureStatus,
678 #[serde(skip)]
679 pub dedupe_claimed: bool,
680}
681
682impl TriggerEvent {
683 #[allow(clippy::too_many_arguments)]
684 pub fn new(
685 provider: ProviderId,
686 kind: impl Into<String>,
687 occurred_at: Option<OffsetDateTime>,
688 dedupe_key: impl Into<String>,
689 tenant_id: Option<TenantId>,
690 headers: BTreeMap<String, String>,
691 provider_payload: ProviderPayload,
692 signature_status: SignatureStatus,
693 ) -> Self {
694 Self {
695 id: TriggerEventId::new(),
696 provider,
697 kind: kind.into(),
698 received_at: clock::now_utc(),
699 occurred_at,
700 dedupe_key: dedupe_key.into(),
701 trace_id: TraceId::new(),
702 tenant_id,
703 headers,
704 batch: None,
705 raw_body: None,
706 provider_payload,
707 signature_status,
708 dedupe_claimed: false,
709 }
710 }
711
712 pub fn dedupe_claimed(&self) -> bool {
713 self.dedupe_claimed
714 }
715
716 pub fn mark_dedupe_claimed(&mut self) {
717 self.dedupe_claimed = true;
718 }
719}
720
721#[derive(Clone, Debug, PartialEq, Eq)]
722pub struct HeaderRedactionPolicy {
723 safe_exact_names: BTreeSet<String>,
724}
725
726impl HeaderRedactionPolicy {
727 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
728 self.safe_exact_names
729 .insert(name.into().to_ascii_lowercase());
730 self
731 }
732
733 fn should_keep(&self, name: &str) -> bool {
734 let lower = name.to_ascii_lowercase();
735 if self.safe_exact_names.contains(lower.as_str()) {
736 return true;
737 }
738 matches!(
739 lower.as_str(),
740 "user-agent"
741 | "request-id"
742 | "x-request-id"
743 | "x-correlation-id"
744 | "content-type"
745 | "content-length"
746 | "x-github-event"
747 | "x-github-delivery"
748 | "x-github-hook-id"
749 | "x-hub-signature-256"
750 | "x-slack-request-timestamp"
751 | "x-slack-signature"
752 | "x-linear-signature"
753 | "x-notion-signature"
754 | "x-a2a-signature"
755 | "x-a2a-delivery"
756 ) || lower.ends_with("-event")
757 || lower.ends_with("-delivery")
758 || lower.contains("timestamp")
759 || lower.contains("request-id")
760 }
761
762 fn should_redact(&self, name: &str) -> bool {
763 let lower = name.to_ascii_lowercase();
764 if self.should_keep(lower.as_str()) {
765 return false;
766 }
767 lower.contains("authorization")
768 || lower.contains("cookie")
769 || lower.contains("secret")
770 || lower.contains("token")
771 || lower.contains("key")
772 }
773}
774
775impl Default for HeaderRedactionPolicy {
776 fn default() -> Self {
777 Self {
778 safe_exact_names: BTreeSet::from([
779 "content-length".to_string(),
780 "content-type".to_string(),
781 "request-id".to_string(),
782 "user-agent".to_string(),
783 "x-a2a-delivery".to_string(),
784 "x-a2a-signature".to_string(),
785 "x-correlation-id".to_string(),
786 "x-github-delivery".to_string(),
787 "x-github-event".to_string(),
788 "x-github-hook-id".to_string(),
789 "x-hub-signature-256".to_string(),
790 "x-linear-signature".to_string(),
791 "x-notion-signature".to_string(),
792 "x-request-id".to_string(),
793 "x-slack-request-timestamp".to_string(),
794 "x-slack-signature".to_string(),
795 ]),
796 }
797 }
798}
799
800pub fn redact_headers(
801 headers: &BTreeMap<String, String>,
802 policy: &HeaderRedactionPolicy,
803) -> BTreeMap<String, String> {
804 headers
805 .iter()
806 .map(|(name, value)| {
807 if policy.should_redact(name) {
808 (name.clone(), REDACTED_HEADER_VALUE.to_string())
809 } else {
810 (name.clone(), value.clone())
811 }
812 })
813 .collect()
814}
815
816#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
817pub struct ProviderSecretRequirement {
818 pub name: String,
819 pub required: bool,
820 pub namespace: String,
821}
822
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct ProviderOutboundMethod {
825 pub name: String,
826}
827
828#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
829#[serde(tag = "kind", rename_all = "snake_case")]
830pub enum SignatureVerificationMetadata {
831 #[default]
832 None,
833 Hmac {
834 variant: String,
835 raw_body: bool,
836 signature_header: String,
837 timestamp_header: Option<String>,
838 id_header: Option<String>,
839 default_tolerance_secs: Option<i64>,
840 digest: String,
841 encoding: String,
842 },
843}
844
845#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
846#[serde(tag = "kind", rename_all = "snake_case")]
847pub enum ProviderRuntimeMetadata {
848 Builtin {
849 connector: String,
850 default_signature_variant: Option<String>,
851 },
852 #[default]
853 Placeholder,
854}
855
856#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
857pub struct ProviderMetadata {
858 pub provider: String,
859 #[serde(default)]
860 pub kinds: Vec<String>,
861 pub schema_name: String,
862 #[serde(default)]
863 pub outbound_methods: Vec<ProviderOutboundMethod>,
864 #[serde(default)]
865 pub secret_requirements: Vec<ProviderSecretRequirement>,
866 #[serde(default)]
867 pub signature_verification: SignatureVerificationMetadata,
868 #[serde(default)]
869 pub runtime: ProviderRuntimeMetadata,
870}
871
872impl ProviderMetadata {
873 pub fn supports_kind(&self, kind: &str) -> bool {
874 self.kinds.iter().any(|candidate| candidate == kind)
875 }
876
877 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
878 self.secret_requirements
879 .iter()
880 .filter(|requirement| requirement.required)
881 .map(|requirement| requirement.name.as_str())
882 }
883}
884
885pub trait ProviderSchema: Send + Sync {
886 fn provider_id(&self) -> &'static str;
887 fn harn_schema_name(&self) -> &'static str;
888 fn metadata(&self) -> ProviderMetadata {
889 ProviderMetadata {
890 provider: self.provider_id().to_string(),
891 schema_name: self.harn_schema_name().to_string(),
892 ..ProviderMetadata::default()
893 }
894 }
895 fn normalize(
896 &self,
897 kind: &str,
898 headers: &BTreeMap<String, String>,
899 raw: JsonValue,
900 ) -> Result<ProviderPayload, ProviderCatalogError>;
901}
902
903#[derive(Clone, Debug, PartialEq, Eq)]
904pub enum ProviderCatalogError {
905 DuplicateProvider(String),
906 UnknownProvider(String),
907}
908
909impl std::fmt::Display for ProviderCatalogError {
910 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
911 match self {
912 Self::DuplicateProvider(provider) => {
913 write!(f, "provider `{provider}` is already registered")
914 }
915 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
916 }
917 }
918}
919
920impl std::error::Error for ProviderCatalogError {}
921
922#[derive(Clone, Default)]
923pub struct ProviderCatalog {
924 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
925}
926
927impl ProviderCatalog {
928 pub fn with_defaults() -> Self {
929 let mut catalog = Self::default();
930 for schema in default_provider_schemas() {
931 catalog
932 .register(schema)
933 .expect("default providers must register cleanly");
934 }
935 catalog
936 }
937
938 pub fn register(
939 &mut self,
940 schema: Arc<dyn ProviderSchema>,
941 ) -> Result<(), ProviderCatalogError> {
942 let provider = schema.provider_id().to_string();
943 if self.providers.contains_key(provider.as_str()) {
944 return Err(ProviderCatalogError::DuplicateProvider(provider));
945 }
946 self.providers.insert(provider, schema);
947 Ok(())
948 }
949
950 pub fn normalize(
951 &self,
952 provider: &ProviderId,
953 kind: &str,
954 headers: &BTreeMap<String, String>,
955 raw: JsonValue,
956 ) -> Result<ProviderPayload, ProviderCatalogError> {
957 let schema = self
958 .providers
959 .get(provider.as_str())
960 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
961 schema.normalize(kind, headers, raw)
962 }
963
964 pub fn schema_names(&self) -> BTreeMap<String, String> {
965 self.providers
966 .iter()
967 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
968 .collect()
969 }
970
971 pub fn entries(&self) -> Vec<ProviderMetadata> {
972 self.providers
973 .values()
974 .map(|schema| schema.metadata())
975 .collect()
976 }
977
978 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
979 self.providers.get(provider).map(|schema| schema.metadata())
980 }
981}
982
983pub fn register_provider_schema(
984 schema: Arc<dyn ProviderSchema>,
985) -> Result<(), ProviderCatalogError> {
986 provider_catalog()
987 .write()
988 .expect("provider catalog poisoned")
989 .register(schema)
990}
991
992pub fn reset_provider_catalog() {
993 *provider_catalog()
994 .write()
995 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
996}
997
998pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
999 provider_catalog()
1000 .read()
1001 .expect("provider catalog poisoned")
1002 .schema_names()
1003}
1004
1005pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1006 provider_catalog()
1007 .read()
1008 .expect("provider catalog poisoned")
1009 .entries()
1010}
1011
1012pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1013 provider_catalog()
1014 .read()
1015 .expect("provider catalog poisoned")
1016 .metadata_for(provider)
1017}
1018
1019fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1020 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1021 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1022}
1023
1024struct BuiltinProviderSchema {
1025 provider_id: &'static str,
1026 harn_schema_name: &'static str,
1027 metadata: ProviderMetadata,
1028 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1029}
1030
1031impl ProviderSchema for BuiltinProviderSchema {
1032 fn provider_id(&self) -> &'static str {
1033 self.provider_id
1034 }
1035
1036 fn harn_schema_name(&self) -> &'static str {
1037 self.harn_schema_name
1038 }
1039
1040 fn metadata(&self) -> ProviderMetadata {
1041 self.metadata.clone()
1042 }
1043
1044 fn normalize(
1045 &self,
1046 kind: &str,
1047 headers: &BTreeMap<String, String>,
1048 raw: JsonValue,
1049 ) -> Result<ProviderPayload, ProviderCatalogError> {
1050 Ok((self.normalize)(kind, headers, raw))
1051 }
1052}
1053
1054fn provider_metadata_entry(
1055 provider: &str,
1056 kinds: &[&str],
1057 schema_name: &str,
1058 outbound_methods: &[&str],
1059 signature_verification: SignatureVerificationMetadata,
1060 secret_requirements: Vec<ProviderSecretRequirement>,
1061 runtime: ProviderRuntimeMetadata,
1062) -> ProviderMetadata {
1063 ProviderMetadata {
1064 provider: provider.to_string(),
1065 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1066 schema_name: schema_name.to_string(),
1067 outbound_methods: outbound_methods
1068 .iter()
1069 .map(|name| ProviderOutboundMethod {
1070 name: (*name).to_string(),
1071 })
1072 .collect(),
1073 secret_requirements,
1074 signature_verification,
1075 runtime,
1076 }
1077}
1078
1079fn hmac_signature_metadata(
1080 variant: &str,
1081 signature_header: &str,
1082 timestamp_header: Option<&str>,
1083 id_header: Option<&str>,
1084 default_tolerance_secs: Option<i64>,
1085 encoding: &str,
1086) -> SignatureVerificationMetadata {
1087 SignatureVerificationMetadata::Hmac {
1088 variant: variant.to_string(),
1089 raw_body: true,
1090 signature_header: signature_header.to_string(),
1091 timestamp_header: timestamp_header.map(ToString::to_string),
1092 id_header: id_header.map(ToString::to_string),
1093 default_tolerance_secs,
1094 digest: "sha256".to_string(),
1095 encoding: encoding.to_string(),
1096 }
1097}
1098
1099fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1100 ProviderSecretRequirement {
1101 name: name.to_string(),
1102 required: true,
1103 namespace: namespace.to_string(),
1104 }
1105}
1106
1107fn outbound_method(name: &str) -> ProviderOutboundMethod {
1108 ProviderOutboundMethod {
1109 name: name.to_string(),
1110 }
1111}
1112
1113fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1114 ProviderSecretRequirement {
1115 name: name.to_string(),
1116 required: false,
1117 namespace: namespace.to_string(),
1118 }
1119}
1120
1121fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1122 vec![
1123 Arc::new(BuiltinProviderSchema {
1124 provider_id: "github",
1125 harn_schema_name: "GitHubEventPayload",
1126 metadata: provider_metadata_entry(
1127 "github",
1128 &["webhook"],
1129 "GitHubEventPayload",
1130 &[],
1131 hmac_signature_metadata(
1132 "github",
1133 "X-Hub-Signature-256",
1134 None,
1135 Some("X-GitHub-Delivery"),
1136 None,
1137 "hex",
1138 ),
1139 vec![required_secret("signing_secret", "github")],
1140 ProviderRuntimeMetadata::Builtin {
1141 connector: "webhook".to_string(),
1142 default_signature_variant: Some("github".to_string()),
1143 },
1144 ),
1145 normalize: github_payload,
1146 }),
1147 Arc::new(BuiltinProviderSchema {
1148 provider_id: "slack",
1149 harn_schema_name: "SlackEventPayload",
1150 metadata: provider_metadata_entry(
1151 "slack",
1152 &["webhook"],
1153 "SlackEventPayload",
1154 &[
1155 "post_message",
1156 "update_message",
1157 "add_reaction",
1158 "open_view",
1159 "user_info",
1160 "api_call",
1161 "upload_file",
1162 ],
1163 hmac_signature_metadata(
1164 "slack",
1165 "X-Slack-Signature",
1166 Some("X-Slack-Request-Timestamp"),
1167 None,
1168 Some(300),
1169 "hex",
1170 ),
1171 vec![required_secret("signing_secret", "slack")],
1172 ProviderRuntimeMetadata::Builtin {
1173 connector: "slack".to_string(),
1174 default_signature_variant: Some("slack".to_string()),
1175 },
1176 ),
1177 normalize: slack_payload,
1178 }),
1179 Arc::new(BuiltinProviderSchema {
1180 provider_id: "linear",
1181 harn_schema_name: "LinearEventPayload",
1182 metadata: {
1183 let mut metadata = provider_metadata_entry(
1184 "linear",
1185 &["webhook"],
1186 "LinearEventPayload",
1187 &[],
1188 hmac_signature_metadata(
1189 "linear",
1190 "Linear-Signature",
1191 None,
1192 Some("Linear-Delivery"),
1193 Some(75),
1194 "hex",
1195 ),
1196 vec![
1197 required_secret("signing_secret", "linear"),
1198 optional_secret("access_token", "linear"),
1199 ],
1200 ProviderRuntimeMetadata::Builtin {
1201 connector: "linear".to_string(),
1202 default_signature_variant: Some("linear".to_string()),
1203 },
1204 );
1205 metadata.outbound_methods = vec![
1206 ProviderOutboundMethod {
1207 name: "list_issues".to_string(),
1208 },
1209 ProviderOutboundMethod {
1210 name: "update_issue".to_string(),
1211 },
1212 ProviderOutboundMethod {
1213 name: "create_comment".to_string(),
1214 },
1215 ProviderOutboundMethod {
1216 name: "search".to_string(),
1217 },
1218 ProviderOutboundMethod {
1219 name: "graphql".to_string(),
1220 },
1221 ];
1222 metadata
1223 },
1224 normalize: linear_payload,
1225 }),
1226 Arc::new(BuiltinProviderSchema {
1227 provider_id: "notion",
1228 harn_schema_name: "NotionEventPayload",
1229 metadata: {
1230 let mut metadata = provider_metadata_entry(
1231 "notion",
1232 &["webhook", "poll"],
1233 "NotionEventPayload",
1234 &[],
1235 hmac_signature_metadata(
1236 "notion",
1237 "X-Notion-Signature",
1238 None,
1239 None,
1240 None,
1241 "hex",
1242 ),
1243 vec![required_secret("verification_token", "notion")],
1244 ProviderRuntimeMetadata::Builtin {
1245 connector: "notion".to_string(),
1246 default_signature_variant: Some("notion".to_string()),
1247 },
1248 );
1249 metadata.outbound_methods = vec![
1250 outbound_method("get_page"),
1251 outbound_method("update_page"),
1252 outbound_method("append_blocks"),
1253 outbound_method("query_database"),
1254 outbound_method("search"),
1255 outbound_method("create_comment"),
1256 outbound_method("api_call"),
1257 ];
1258 metadata
1259 },
1260 normalize: notion_payload,
1261 }),
1262 Arc::new(BuiltinProviderSchema {
1263 provider_id: "cron",
1264 harn_schema_name: "CronEventPayload",
1265 metadata: provider_metadata_entry(
1266 "cron",
1267 &["cron"],
1268 "CronEventPayload",
1269 &[],
1270 SignatureVerificationMetadata::None,
1271 Vec::new(),
1272 ProviderRuntimeMetadata::Builtin {
1273 connector: "cron".to_string(),
1274 default_signature_variant: None,
1275 },
1276 ),
1277 normalize: cron_payload,
1278 }),
1279 Arc::new(BuiltinProviderSchema {
1280 provider_id: "webhook",
1281 harn_schema_name: "GenericWebhookPayload",
1282 metadata: provider_metadata_entry(
1283 "webhook",
1284 &["webhook"],
1285 "GenericWebhookPayload",
1286 &[],
1287 hmac_signature_metadata(
1288 "standard",
1289 "webhook-signature",
1290 Some("webhook-timestamp"),
1291 Some("webhook-id"),
1292 Some(300),
1293 "base64",
1294 ),
1295 vec![required_secret("signing_secret", "webhook")],
1296 ProviderRuntimeMetadata::Builtin {
1297 connector: "webhook".to_string(),
1298 default_signature_variant: Some("standard".to_string()),
1299 },
1300 ),
1301 normalize: webhook_payload,
1302 }),
1303 Arc::new(BuiltinProviderSchema {
1304 provider_id: "a2a-push",
1305 harn_schema_name: "A2aPushPayload",
1306 metadata: provider_metadata_entry(
1307 "a2a-push",
1308 &["a2a-push"],
1309 "A2aPushPayload",
1310 &[],
1311 SignatureVerificationMetadata::None,
1312 Vec::new(),
1313 ProviderRuntimeMetadata::Placeholder,
1314 ),
1315 normalize: a2a_push_payload,
1316 }),
1317 Arc::new(stream_provider_schema("kafka", kafka_payload)),
1318 Arc::new(stream_provider_schema("nats", nats_payload)),
1319 Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1320 Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1321 Arc::new(stream_provider_schema("email", email_payload)),
1322 Arc::new(stream_provider_schema("websocket", websocket_payload)),
1323 ]
1324}
1325
1326fn stream_provider_schema(
1327 provider_id: &'static str,
1328 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1329) -> BuiltinProviderSchema {
1330 BuiltinProviderSchema {
1331 provider_id,
1332 harn_schema_name: "StreamEventPayload",
1333 metadata: provider_metadata_entry(
1334 provider_id,
1335 &["stream"],
1336 "StreamEventPayload",
1337 &[],
1338 SignatureVerificationMetadata::None,
1339 Vec::new(),
1340 ProviderRuntimeMetadata::Placeholder,
1341 ),
1342 normalize,
1343 }
1344}
1345
1346fn github_payload(
1347 kind: &str,
1348 headers: &BTreeMap<String, String>,
1349 raw: JsonValue,
1350) -> ProviderPayload {
1351 let common = GitHubEventCommon {
1352 event: kind.to_string(),
1353 action: raw
1354 .get("action")
1355 .and_then(JsonValue::as_str)
1356 .map(ToString::to_string),
1357 delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1358 installation_id: raw
1359 .get("installation")
1360 .and_then(|value| value.get("id"))
1361 .and_then(JsonValue::as_i64),
1362 raw: raw.clone(),
1363 };
1364 let payload = match kind {
1365 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1366 common,
1367 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1368 }),
1369 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1370 common,
1371 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1372 }),
1373 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1374 common,
1375 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1376 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1377 }),
1378 "pull_request_review" => {
1379 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1380 common,
1381 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1382 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1383 })
1384 }
1385 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1386 common,
1387 commits: raw
1388 .get("commits")
1389 .and_then(JsonValue::as_array)
1390 .cloned()
1391 .unwrap_or_default(),
1392 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1393 }),
1394 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1395 common,
1396 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1397 }),
1398 "deployment_status" => {
1399 GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1400 common,
1401 deployment_status: raw
1402 .get("deployment_status")
1403 .cloned()
1404 .unwrap_or(JsonValue::Null),
1405 deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1406 })
1407 }
1408 "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1409 common,
1410 check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1411 }),
1412 _ => GitHubEventPayload::Other(common),
1413 };
1414 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1415}
1416
1417fn slack_payload(
1418 kind: &str,
1419 _headers: &BTreeMap<String, String>,
1420 raw: JsonValue,
1421) -> ProviderPayload {
1422 let event = raw.get("event");
1423 let common = SlackEventCommon {
1424 event: kind.to_string(),
1425 event_id: raw
1426 .get("event_id")
1427 .and_then(JsonValue::as_str)
1428 .map(ToString::to_string),
1429 api_app_id: raw
1430 .get("api_app_id")
1431 .and_then(JsonValue::as_str)
1432 .map(ToString::to_string),
1433 team_id: raw
1434 .get("team_id")
1435 .and_then(JsonValue::as_str)
1436 .map(ToString::to_string),
1437 channel_id: slack_channel_id(event),
1438 user_id: slack_user_id(event),
1439 event_ts: event
1440 .and_then(|value| value.get("event_ts"))
1441 .and_then(JsonValue::as_str)
1442 .map(ToString::to_string),
1443 raw: raw.clone(),
1444 };
1445 let payload = match kind {
1446 kind if kind == "message" || kind.starts_with("message.") => {
1447 SlackEventPayload::Message(SlackMessageEventPayload {
1448 subtype: event
1449 .and_then(|value| value.get("subtype"))
1450 .and_then(JsonValue::as_str)
1451 .map(ToString::to_string),
1452 channel_type: event
1453 .and_then(|value| value.get("channel_type"))
1454 .and_then(JsonValue::as_str)
1455 .map(ToString::to_string),
1456 channel: event
1457 .and_then(|value| value.get("channel"))
1458 .and_then(JsonValue::as_str)
1459 .map(ToString::to_string),
1460 user: event
1461 .and_then(|value| value.get("user"))
1462 .and_then(JsonValue::as_str)
1463 .map(ToString::to_string),
1464 text: event
1465 .and_then(|value| value.get("text"))
1466 .and_then(JsonValue::as_str)
1467 .map(ToString::to_string),
1468 ts: event
1469 .and_then(|value| value.get("ts"))
1470 .and_then(JsonValue::as_str)
1471 .map(ToString::to_string),
1472 thread_ts: event
1473 .and_then(|value| value.get("thread_ts"))
1474 .and_then(JsonValue::as_str)
1475 .map(ToString::to_string),
1476 common,
1477 })
1478 }
1479 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1480 channel: event
1481 .and_then(|value| value.get("channel"))
1482 .and_then(JsonValue::as_str)
1483 .map(ToString::to_string),
1484 user: event
1485 .and_then(|value| value.get("user"))
1486 .and_then(JsonValue::as_str)
1487 .map(ToString::to_string),
1488 text: event
1489 .and_then(|value| value.get("text"))
1490 .and_then(JsonValue::as_str)
1491 .map(ToString::to_string),
1492 ts: event
1493 .and_then(|value| value.get("ts"))
1494 .and_then(JsonValue::as_str)
1495 .map(ToString::to_string),
1496 thread_ts: event
1497 .and_then(|value| value.get("thread_ts"))
1498 .and_then(JsonValue::as_str)
1499 .map(ToString::to_string),
1500 common,
1501 }),
1502 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1503 reaction: event
1504 .and_then(|value| value.get("reaction"))
1505 .and_then(JsonValue::as_str)
1506 .map(ToString::to_string),
1507 item_user: event
1508 .and_then(|value| value.get("item_user"))
1509 .and_then(JsonValue::as_str)
1510 .map(ToString::to_string),
1511 item: event
1512 .and_then(|value| value.get("item"))
1513 .cloned()
1514 .unwrap_or(JsonValue::Null),
1515 common,
1516 }),
1517 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1518 user: event
1519 .and_then(|value| value.get("user"))
1520 .and_then(JsonValue::as_str)
1521 .map(ToString::to_string),
1522 channel: event
1523 .and_then(|value| value.get("channel"))
1524 .and_then(JsonValue::as_str)
1525 .map(ToString::to_string),
1526 tab: event
1527 .and_then(|value| value.get("tab"))
1528 .and_then(JsonValue::as_str)
1529 .map(ToString::to_string),
1530 view: event
1531 .and_then(|value| value.get("view"))
1532 .cloned()
1533 .unwrap_or(JsonValue::Null),
1534 common,
1535 }),
1536 "assistant_thread_started" => {
1537 let assistant_thread = event
1538 .and_then(|value| value.get("assistant_thread"))
1539 .cloned()
1540 .unwrap_or(JsonValue::Null);
1541 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1542 thread_ts: assistant_thread
1543 .get("thread_ts")
1544 .and_then(JsonValue::as_str)
1545 .map(ToString::to_string),
1546 context: assistant_thread
1547 .get("context")
1548 .cloned()
1549 .unwrap_or(JsonValue::Null),
1550 assistant_thread,
1551 common,
1552 })
1553 }
1554 _ => SlackEventPayload::Other(common),
1555 };
1556 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1557}
1558
1559fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1560 event
1561 .and_then(|value| value.get("channel"))
1562 .and_then(JsonValue::as_str)
1563 .map(ToString::to_string)
1564 .or_else(|| {
1565 event
1566 .and_then(|value| value.get("item"))
1567 .and_then(|value| value.get("channel"))
1568 .and_then(JsonValue::as_str)
1569 .map(ToString::to_string)
1570 })
1571 .or_else(|| {
1572 event
1573 .and_then(|value| value.get("channel"))
1574 .and_then(|value| value.get("id"))
1575 .and_then(JsonValue::as_str)
1576 .map(ToString::to_string)
1577 })
1578 .or_else(|| {
1579 event
1580 .and_then(|value| value.get("assistant_thread"))
1581 .and_then(|value| value.get("channel_id"))
1582 .and_then(JsonValue::as_str)
1583 .map(ToString::to_string)
1584 })
1585}
1586
1587fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1588 event
1589 .and_then(|value| value.get("user"))
1590 .and_then(JsonValue::as_str)
1591 .map(ToString::to_string)
1592 .or_else(|| {
1593 event
1594 .and_then(|value| value.get("user"))
1595 .and_then(|value| value.get("id"))
1596 .and_then(JsonValue::as_str)
1597 .map(ToString::to_string)
1598 })
1599 .or_else(|| {
1600 event
1601 .and_then(|value| value.get("item_user"))
1602 .and_then(JsonValue::as_str)
1603 .map(ToString::to_string)
1604 })
1605 .or_else(|| {
1606 event
1607 .and_then(|value| value.get("assistant_thread"))
1608 .and_then(|value| value.get("user_id"))
1609 .and_then(JsonValue::as_str)
1610 .map(ToString::to_string)
1611 })
1612}
1613
1614fn linear_payload(
1615 _kind: &str,
1616 headers: &BTreeMap<String, String>,
1617 raw: JsonValue,
1618) -> ProviderPayload {
1619 let common = linear_event_common(headers, &raw);
1620 let event = common.event.clone();
1621 let payload = match event.as_str() {
1622 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1623 common,
1624 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1625 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1626 }),
1627 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1628 common,
1629 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1630 }),
1631 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1632 common,
1633 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1634 }),
1635 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1636 common,
1637 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1638 }),
1639 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1640 common,
1641 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1642 }),
1643 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1644 common,
1645 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1646 }),
1647 "customer_request" => {
1648 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1649 common,
1650 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1651 })
1652 }
1653 _ => LinearEventPayload::Other(common),
1654 };
1655 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1656}
1657
1658fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1659 LinearEventCommon {
1660 event: linear_event_name(
1661 raw.get("type")
1662 .and_then(JsonValue::as_str)
1663 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1664 ),
1665 action: raw
1666 .get("action")
1667 .and_then(JsonValue::as_str)
1668 .map(ToString::to_string),
1669 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1670 organization_id: raw
1671 .get("organizationId")
1672 .and_then(JsonValue::as_str)
1673 .map(ToString::to_string),
1674 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1675 webhook_id: raw
1676 .get("webhookId")
1677 .and_then(JsonValue::as_str)
1678 .map(ToString::to_string),
1679 url: raw
1680 .get("url")
1681 .and_then(JsonValue::as_str)
1682 .map(ToString::to_string),
1683 created_at: raw
1684 .get("createdAt")
1685 .and_then(JsonValue::as_str)
1686 .map(ToString::to_string),
1687 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1688 raw: raw.clone(),
1689 }
1690}
1691
1692fn linear_event_name(raw_type: Option<&str>) -> String {
1693 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1694 "issue" => "issue".to_string(),
1695 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1696 "issuelabel" | "issue_label" => "issue_label".to_string(),
1697 "project" | "projectupdate" | "project_update" => "project".to_string(),
1698 "cycle" => "cycle".to_string(),
1699 "customer" => "customer".to_string(),
1700 "customerrequest" | "customer_request" => "customer_request".to_string(),
1701 other if !other.is_empty() => other.to_string(),
1702 _ => "other".to_string(),
1703 }
1704}
1705
1706fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1707 let Some(JsonValue::Object(fields)) = updated_from else {
1708 return Vec::new();
1709 };
1710 let mut changes = Vec::new();
1711 for (field, previous) in fields {
1712 let change = match field.as_str() {
1713 "title" => LinearIssueChange::Title {
1714 previous: previous.as_str().map(ToString::to_string),
1715 },
1716 "description" => LinearIssueChange::Description {
1717 previous: previous.as_str().map(ToString::to_string),
1718 },
1719 "priority" => LinearIssueChange::Priority {
1720 previous: parse_json_i64ish(previous),
1721 },
1722 "estimate" => LinearIssueChange::Estimate {
1723 previous: parse_json_i64ish(previous),
1724 },
1725 "stateId" => LinearIssueChange::StateId {
1726 previous: previous.as_str().map(ToString::to_string),
1727 },
1728 "teamId" => LinearIssueChange::TeamId {
1729 previous: previous.as_str().map(ToString::to_string),
1730 },
1731 "assigneeId" => LinearIssueChange::AssigneeId {
1732 previous: previous.as_str().map(ToString::to_string),
1733 },
1734 "projectId" => LinearIssueChange::ProjectId {
1735 previous: previous.as_str().map(ToString::to_string),
1736 },
1737 "cycleId" => LinearIssueChange::CycleId {
1738 previous: previous.as_str().map(ToString::to_string),
1739 },
1740 "dueDate" => LinearIssueChange::DueDate {
1741 previous: previous.as_str().map(ToString::to_string),
1742 },
1743 "parentId" => LinearIssueChange::ParentId {
1744 previous: previous.as_str().map(ToString::to_string),
1745 },
1746 "sortOrder" => LinearIssueChange::SortOrder {
1747 previous: previous.as_f64(),
1748 },
1749 "labelIds" => LinearIssueChange::LabelIds {
1750 previous: parse_string_array(previous),
1751 },
1752 "completedAt" => LinearIssueChange::CompletedAt {
1753 previous: previous.as_str().map(ToString::to_string),
1754 },
1755 _ => LinearIssueChange::Other {
1756 field: field.clone(),
1757 previous: previous.clone(),
1758 },
1759 };
1760 changes.push(change);
1761 }
1762 changes
1763}
1764
1765fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1766 value
1767 .as_i64()
1768 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1769 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1770}
1771
1772fn parse_string_array(value: &JsonValue) -> Vec<String> {
1773 let Some(array) = value.as_array() else {
1774 return Vec::new();
1775 };
1776 array
1777 .iter()
1778 .filter_map(|entry| {
1779 entry.as_str().map(ToString::to_string).or_else(|| {
1780 entry
1781 .get("id")
1782 .and_then(JsonValue::as_str)
1783 .map(ToString::to_string)
1784 })
1785 })
1786 .collect()
1787}
1788
1789fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1790 headers
1791 .iter()
1792 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1793 .map(|(_, value)| value.as_str())
1794}
1795
1796fn notion_payload(
1797 kind: &str,
1798 headers: &BTreeMap<String, String>,
1799 raw: JsonValue,
1800) -> ProviderPayload {
1801 let workspace_id = raw
1802 .get("workspace_id")
1803 .and_then(JsonValue::as_str)
1804 .map(ToString::to_string);
1805 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1806 event: kind.to_string(),
1807 workspace_id,
1808 request_id: headers
1809 .get("request-id")
1810 .cloned()
1811 .or_else(|| headers.get("x-request-id").cloned()),
1812 subscription_id: raw
1813 .get("subscription_id")
1814 .and_then(JsonValue::as_str)
1815 .map(ToString::to_string),
1816 integration_id: raw
1817 .get("integration_id")
1818 .and_then(JsonValue::as_str)
1819 .map(ToString::to_string),
1820 attempt_number: raw
1821 .get("attempt_number")
1822 .and_then(JsonValue::as_u64)
1823 .and_then(|value| u32::try_from(value).ok()),
1824 entity_id: raw
1825 .get("entity")
1826 .and_then(|value| value.get("id"))
1827 .and_then(JsonValue::as_str)
1828 .map(ToString::to_string),
1829 entity_type: raw
1830 .get("entity")
1831 .and_then(|value| value.get("type"))
1832 .and_then(JsonValue::as_str)
1833 .map(ToString::to_string),
1834 api_version: raw
1835 .get("api_version")
1836 .and_then(JsonValue::as_str)
1837 .map(ToString::to_string),
1838 verification_token: raw
1839 .get("verification_token")
1840 .and_then(JsonValue::as_str)
1841 .map(ToString::to_string),
1842 polled: None,
1843 raw,
1844 })))
1845}
1846
1847fn cron_payload(
1848 _kind: &str,
1849 _headers: &BTreeMap<String, String>,
1850 raw: JsonValue,
1851) -> ProviderPayload {
1852 let cron_id = raw
1853 .get("cron_id")
1854 .and_then(JsonValue::as_str)
1855 .map(ToString::to_string);
1856 let schedule = raw
1857 .get("schedule")
1858 .and_then(JsonValue::as_str)
1859 .map(ToString::to_string);
1860 let tick_at = raw
1861 .get("tick_at")
1862 .and_then(JsonValue::as_str)
1863 .and_then(parse_rfc3339)
1864 .unwrap_or_else(OffsetDateTime::now_utc);
1865 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1866 cron_id,
1867 schedule,
1868 tick_at,
1869 raw,
1870 }))
1871}
1872
1873fn webhook_payload(
1874 _kind: &str,
1875 headers: &BTreeMap<String, String>,
1876 raw: JsonValue,
1877) -> ProviderPayload {
1878 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1879 source: headers.get("X-Webhook-Source").cloned(),
1880 content_type: headers.get("Content-Type").cloned(),
1881 raw,
1882 }))
1883}
1884
1885fn a2a_push_payload(
1886 _kind: &str,
1887 _headers: &BTreeMap<String, String>,
1888 raw: JsonValue,
1889) -> ProviderPayload {
1890 let task_id = raw
1891 .get("task_id")
1892 .and_then(JsonValue::as_str)
1893 .map(ToString::to_string);
1894 let sender = raw
1895 .get("sender")
1896 .and_then(JsonValue::as_str)
1897 .map(ToString::to_string);
1898 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1899 task_id,
1900 sender,
1901 raw,
1902 }))
1903}
1904
1905fn kafka_payload(
1906 kind: &str,
1907 headers: &BTreeMap<String, String>,
1908 raw: JsonValue,
1909) -> ProviderPayload {
1910 ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1911 kind, headers, raw,
1912 )))
1913}
1914
1915fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1916 ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1917 kind, headers, raw,
1918 )))
1919}
1920
1921fn pulsar_payload(
1922 kind: &str,
1923 headers: &BTreeMap<String, String>,
1924 raw: JsonValue,
1925) -> ProviderPayload {
1926 ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1927 kind, headers, raw,
1928 )))
1929}
1930
1931fn postgres_cdc_payload(
1932 kind: &str,
1933 headers: &BTreeMap<String, String>,
1934 raw: JsonValue,
1935) -> ProviderPayload {
1936 ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1937 kind, headers, raw,
1938 )))
1939}
1940
1941fn email_payload(
1942 kind: &str,
1943 headers: &BTreeMap<String, String>,
1944 raw: JsonValue,
1945) -> ProviderPayload {
1946 ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1947 kind, headers, raw,
1948 )))
1949}
1950
1951fn websocket_payload(
1952 kind: &str,
1953 headers: &BTreeMap<String, String>,
1954 raw: JsonValue,
1955) -> ProviderPayload {
1956 ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
1957 kind, headers, raw,
1958 )))
1959}
1960
1961fn stream_payload(
1962 kind: &str,
1963 headers: &BTreeMap<String, String>,
1964 raw: JsonValue,
1965) -> StreamEventPayload {
1966 StreamEventPayload {
1967 event: kind.to_string(),
1968 source: json_stringish(&raw, &["source", "connector", "origin"]),
1969 stream: json_stringish(
1970 &raw,
1971 &["stream", "topic", "subject", "channel", "mailbox", "slot"],
1972 ),
1973 partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
1974 offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
1975 key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
1976 timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
1977 headers: headers.clone(),
1978 raw,
1979 }
1980}
1981
1982fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
1983 fields.iter().find_map(|field| {
1984 let value = raw.get(*field)?;
1985 value
1986 .as_str()
1987 .map(ToString::to_string)
1988 .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
1989 .or_else(|| value.as_u64().map(|number| number.to_string()))
1990 })
1991}
1992
1993fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1994 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1995}
1996
1997#[cfg(test)]
1998mod tests {
1999 use super::*;
2000
2001 fn sample_headers() -> BTreeMap<String, String> {
2002 BTreeMap::from([
2003 ("Authorization".to_string(), "Bearer secret".to_string()),
2004 ("Cookie".to_string(), "session=abc".to_string()),
2005 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2006 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2007 ("X-GitHub-Event".to_string(), "issues".to_string()),
2008 ("X-Webhook-Token".to_string(), "token".to_string()),
2009 ])
2010 }
2011
2012 #[test]
2013 fn default_redaction_policy_keeps_safe_headers() {
2014 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2015 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2016 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2017 assert_eq!(
2018 redacted.get("Authorization").unwrap(),
2019 REDACTED_HEADER_VALUE
2020 );
2021 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2022 assert_eq!(
2023 redacted.get("X-Webhook-Token").unwrap(),
2024 REDACTED_HEADER_VALUE
2025 );
2026 }
2027
2028 #[test]
2029 fn provider_catalog_rejects_duplicates() {
2030 let mut catalog = ProviderCatalog::default();
2031 catalog
2032 .register(Arc::new(BuiltinProviderSchema {
2033 provider_id: "github",
2034 harn_schema_name: "GitHubEventPayload",
2035 metadata: provider_metadata_entry(
2036 "github",
2037 &["webhook"],
2038 "GitHubEventPayload",
2039 &[],
2040 SignatureVerificationMetadata::None,
2041 Vec::new(),
2042 ProviderRuntimeMetadata::Placeholder,
2043 ),
2044 normalize: github_payload,
2045 }))
2046 .unwrap();
2047 let error = catalog
2048 .register(Arc::new(BuiltinProviderSchema {
2049 provider_id: "github",
2050 harn_schema_name: "GitHubEventPayload",
2051 metadata: provider_metadata_entry(
2052 "github",
2053 &["webhook"],
2054 "GitHubEventPayload",
2055 &[],
2056 SignatureVerificationMetadata::None,
2057 Vec::new(),
2058 ProviderRuntimeMetadata::Placeholder,
2059 ),
2060 normalize: github_payload,
2061 }))
2062 .unwrap_err();
2063 assert_eq!(
2064 error,
2065 ProviderCatalogError::DuplicateProvider("github".to_string())
2066 );
2067 }
2068
2069 #[test]
2070 fn registered_provider_metadata_marks_builtin_connectors() {
2071 let entries = registered_provider_metadata();
2072 let builtin: Vec<&ProviderMetadata> = entries
2073 .iter()
2074 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2075 .collect();
2076
2077 assert_eq!(builtin.len(), 6);
2078 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2079 assert!(builtin.iter().any(|entry| entry.provider == "github"));
2080 assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2081 assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2082 assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2083 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2084 let kafka = entries
2085 .iter()
2086 .find(|entry| entry.provider == "kafka")
2087 .expect("kafka stream provider");
2088 assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2089 assert_eq!(kafka.schema_name, "StreamEventPayload");
2090 assert!(matches!(
2091 kafka.runtime,
2092 ProviderRuntimeMetadata::Placeholder
2093 ));
2094 }
2095
2096 #[test]
2097 fn trigger_event_round_trip_is_stable() {
2098 let provider = ProviderId::from("github");
2099 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2100 let payload = ProviderPayload::normalize(
2101 &provider,
2102 "issues",
2103 &sample_headers(),
2104 serde_json::json!({
2105 "action": "opened",
2106 "installation": {"id": 42},
2107 "issue": {"number": 99}
2108 }),
2109 )
2110 .unwrap();
2111 let event = TriggerEvent {
2112 id: TriggerEventId("trigger_evt_fixed".to_string()),
2113 provider,
2114 kind: "issues".to_string(),
2115 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2116 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2117 dedupe_key: "delivery-123".to_string(),
2118 trace_id: TraceId("trace_fixed".to_string()),
2119 tenant_id: Some(TenantId("tenant_1".to_string())),
2120 headers,
2121 provider_payload: payload,
2122 signature_status: SignatureStatus::Verified,
2123 dedupe_claimed: false,
2124 batch: None,
2125 raw_body: Some(vec![0, 159, 255, 10]),
2126 };
2127
2128 let once = serde_json::to_value(&event).unwrap();
2129 assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2130 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2131 let twice = serde_json::to_value(&decoded).unwrap();
2132 assert_eq!(decoded, event);
2133 assert_eq!(once, twice);
2134 }
2135
2136 #[test]
2137 fn unknown_provider_errors() {
2138 let error = ProviderPayload::normalize(
2139 &ProviderId::from("custom-provider"),
2140 "thing.happened",
2141 &BTreeMap::new(),
2142 serde_json::json!({"ok": true}),
2143 )
2144 .unwrap_err();
2145 assert_eq!(
2146 error,
2147 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2148 );
2149 }
2150
2151 #[test]
2152 fn provider_normalizes_stream_payloads() {
2153 let payload = ProviderPayload::normalize(
2154 &ProviderId::from("kafka"),
2155 "quote.tick",
2156 &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2157 serde_json::json!({
2158 "topic": "quotes",
2159 "partition": 7,
2160 "offset": "42",
2161 "key": "AAPL",
2162 "timestamp": "2026-04-21T12:00:00Z"
2163 }),
2164 )
2165 .expect("stream payload");
2166 let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2167 panic!("expected kafka stream payload")
2168 };
2169 assert_eq!(payload.event, "quote.tick");
2170 assert_eq!(payload.stream.as_deref(), Some("quotes"));
2171 assert_eq!(payload.partition.as_deref(), Some("7"));
2172 assert_eq!(payload.offset.as_deref(), Some("42"));
2173 assert_eq!(payload.key.as_deref(), Some("AAPL"));
2174 assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2175 }
2176}