1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14 value: &Option<Vec<u8>>,
15 serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18 S: Serializer,
19{
20 use base64::Engine;
21
22 match value {
23 Some(bytes) => {
24 serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25 }
26 None => serializer.serialize_none(),
27 }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32 D: Deserializer<'de>,
33{
34 use base64::Engine;
35
36 let encoded = Option::<String>::deserialize(deserializer)?;
37 encoded
38 .map(|text| {
39 base64::engine::general_purpose::STANDARD
40 .decode(text.as_bytes())
41 .map_err(serde::de::Error::custom)
42 })
43 .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51 pub fn new() -> Self {
52 Self(format!("trigger_evt_{}", Uuid::now_v7()))
53 }
54}
55
56impl Default for TriggerEventId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67 pub fn new(value: impl Into<String>) -> Self {
68 Self(value.into())
69 }
70
71 pub fn as_str(&self) -> &str {
72 self.0.as_str()
73 }
74}
75
76impl From<&str> for ProviderId {
77 fn from(value: &str) -> Self {
78 Self::new(value)
79 }
80}
81
82impl From<String> for ProviderId {
83 fn from(value: String) -> Self {
84 Self::new(value)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93 pub fn new() -> Self {
94 Self(format!("trace_{}", Uuid::now_v7()))
95 }
96}
97
98impl Default for TraceId {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117 Verified,
118 Unsigned,
119 Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124 pub event: String,
125 pub action: Option<String>,
126 pub delivery_id: Option<String>,
127 pub installation_id: Option<i64>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub topic: Option<String>,
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 pub repository: Option<JsonValue>,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub repo: Option<JsonValue>,
134 pub raw: JsonValue,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubIssuesEventPayload {
139 #[serde(flatten)]
140 pub common: GitHubEventCommon,
141 pub issue: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct GitHubPullRequestEventPayload {
146 #[serde(flatten)]
147 pub common: GitHubEventCommon,
148 pub pull_request: JsonValue,
149}
150
151#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
152pub struct GitHubIssueCommentEventPayload {
153 #[serde(flatten)]
154 pub common: GitHubEventCommon,
155 pub issue: JsonValue,
156 pub comment: JsonValue,
157}
158
159#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
160pub struct GitHubPullRequestReviewEventPayload {
161 #[serde(flatten)]
162 pub common: GitHubEventCommon,
163 pub pull_request: JsonValue,
164 pub review: JsonValue,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
168pub struct GitHubPushEventPayload {
169 #[serde(flatten)]
170 pub common: GitHubEventCommon,
171 #[serde(default)]
172 pub commits: Vec<JsonValue>,
173 pub distinct_size: Option<i64>,
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177pub struct GitHubWorkflowRunEventPayload {
178 #[serde(flatten)]
179 pub common: GitHubEventCommon,
180 pub workflow_run: JsonValue,
181}
182
183#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
184pub struct GitHubDeploymentStatusEventPayload {
185 #[serde(flatten)]
186 pub common: GitHubEventCommon,
187 pub deployment_status: JsonValue,
188 pub deployment: JsonValue,
189}
190
191#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
192pub struct GitHubCheckRunEventPayload {
193 #[serde(flatten)]
194 pub common: GitHubEventCommon,
195 pub check_run: JsonValue,
196}
197
198#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
199pub struct GitHubCheckSuiteEventPayload {
200 #[serde(flatten)]
201 pub common: GitHubEventCommon,
202 pub check_suite: JsonValue,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub check_suite_id: Option<i64>,
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub pull_request_number: Option<i64>,
207 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub head_sha: Option<String>,
209 #[serde(default, skip_serializing_if = "Option::is_none")]
210 pub head_ref: Option<String>,
211 #[serde(default, skip_serializing_if = "Option::is_none")]
212 pub base_ref: Option<String>,
213 #[serde(default, skip_serializing_if = "Option::is_none")]
214 pub status: Option<String>,
215 #[serde(default, skip_serializing_if = "Option::is_none")]
216 pub conclusion: Option<String>,
217}
218
219#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
220pub struct GitHubStatusEventPayload {
221 #[serde(flatten)]
222 pub common: GitHubEventCommon,
223 #[serde(default, skip_serializing_if = "Option::is_none")]
224 pub commit_status: Option<JsonValue>,
225 #[serde(default, skip_serializing_if = "Option::is_none")]
226 pub status_id: Option<i64>,
227 #[serde(default, skip_serializing_if = "Option::is_none")]
228 pub head_sha: Option<String>,
229 #[serde(default, skip_serializing_if = "Option::is_none")]
230 pub head_ref: Option<String>,
231 #[serde(default, skip_serializing_if = "Option::is_none")]
232 pub base_ref: Option<String>,
233 #[serde(default, skip_serializing_if = "Option::is_none")]
234 pub state: Option<String>,
235 #[serde(default, skip_serializing_if = "Option::is_none")]
236 pub context: Option<String>,
237 #[serde(default, skip_serializing_if = "Option::is_none")]
238 pub target_url: Option<String>,
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct GitHubMergeGroupEventPayload {
243 #[serde(flatten)]
244 pub common: GitHubEventCommon,
245 pub merge_group: JsonValue,
246 #[serde(default, skip_serializing_if = "Option::is_none")]
247 pub merge_group_id: Option<JsonValue>,
248 #[serde(default, skip_serializing_if = "Option::is_none")]
249 pub head_sha: Option<String>,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub head_ref: Option<String>,
252 #[serde(default, skip_serializing_if = "Option::is_none")]
253 pub base_sha: Option<String>,
254 #[serde(default, skip_serializing_if = "Option::is_none")]
255 pub base_ref: Option<String>,
256 #[serde(default)]
257 pub pull_requests: Vec<JsonValue>,
258 #[serde(default)]
259 pub pull_request_numbers: Vec<i64>,
260}
261
262#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
263pub struct GitHubInstallationEventPayload {
264 #[serde(flatten)]
265 pub common: GitHubEventCommon,
266 #[serde(default, skip_serializing_if = "Option::is_none")]
267 pub installation: Option<JsonValue>,
268 #[serde(default, skip_serializing_if = "Option::is_none")]
269 pub account: Option<JsonValue>,
270 #[serde(default, skip_serializing_if = "Option::is_none")]
271 pub installation_state: Option<String>,
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub suspended: Option<bool>,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub revoked: Option<bool>,
276 #[serde(default)]
277 pub repositories: Vec<JsonValue>,
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct GitHubInstallationRepositoriesEventPayload {
282 #[serde(flatten)]
283 pub common: GitHubEventCommon,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 pub installation: Option<JsonValue>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 pub account: Option<JsonValue>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 pub installation_state: Option<String>,
290 #[serde(default, skip_serializing_if = "Option::is_none")]
291 pub suspended: Option<bool>,
292 #[serde(default, skip_serializing_if = "Option::is_none")]
293 pub revoked: Option<bool>,
294 #[serde(default, skip_serializing_if = "Option::is_none")]
295 pub repository_selection: Option<String>,
296 #[serde(default)]
297 pub repositories_added: Vec<JsonValue>,
298 #[serde(default)]
299 pub repositories_removed: Vec<JsonValue>,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize)]
303#[serde(untagged)]
304pub enum GitHubEventPayload {
305 Issues(GitHubIssuesEventPayload),
306 PullRequest(GitHubPullRequestEventPayload),
307 IssueComment(GitHubIssueCommentEventPayload),
308 PullRequestReview(GitHubPullRequestReviewEventPayload),
309 Push(GitHubPushEventPayload),
310 WorkflowRun(GitHubWorkflowRunEventPayload),
311 DeploymentStatus(GitHubDeploymentStatusEventPayload),
312 CheckRun(GitHubCheckRunEventPayload),
313 CheckSuite(GitHubCheckSuiteEventPayload),
314 Status(GitHubStatusEventPayload),
315 MergeGroup(GitHubMergeGroupEventPayload),
316 Installation(GitHubInstallationEventPayload),
317 InstallationRepositories(GitHubInstallationRepositoriesEventPayload),
318 Other(GitHubEventCommon),
319}
320
321impl<'de> Deserialize<'de> for GitHubEventPayload {
326 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
327 where
328 D: Deserializer<'de>,
329 {
330 let value = JsonValue::deserialize(deserializer)?;
331 let kind = value
332 .get("event")
333 .and_then(JsonValue::as_str)
334 .unwrap_or("")
335 .to_string();
336 let from_value = |v: JsonValue| -> Result<GitHubEventPayload, D::Error> {
337 let payload = match kind.as_str() {
338 "issues" => GitHubEventPayload::Issues(
339 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
340 ),
341 "pull_request" => GitHubEventPayload::PullRequest(
342 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
343 ),
344 "issue_comment" => GitHubEventPayload::IssueComment(
345 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
346 ),
347 "pull_request_review" => GitHubEventPayload::PullRequestReview(
348 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
349 ),
350 "push" => GitHubEventPayload::Push(
351 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
352 ),
353 "workflow_run" => GitHubEventPayload::WorkflowRun(
354 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
355 ),
356 "deployment_status" => GitHubEventPayload::DeploymentStatus(
357 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
358 ),
359 "check_run" => GitHubEventPayload::CheckRun(
360 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
361 ),
362 "check_suite" => GitHubEventPayload::CheckSuite(
363 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
364 ),
365 "status" => GitHubEventPayload::Status(
366 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
367 ),
368 "merge_group" => GitHubEventPayload::MergeGroup(
369 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
370 ),
371 "installation" => GitHubEventPayload::Installation(
372 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
373 ),
374 "installation_repositories" => GitHubEventPayload::InstallationRepositories(
375 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
376 ),
377 _ => GitHubEventPayload::Other(
378 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
379 ),
380 };
381 Ok(payload)
382 };
383 from_value(value)
384 }
385}
386
387#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
388pub struct SlackEventCommon {
389 pub event: String,
390 pub event_id: Option<String>,
391 pub api_app_id: Option<String>,
392 pub team_id: Option<String>,
393 pub channel_id: Option<String>,
394 pub user_id: Option<String>,
395 pub event_ts: Option<String>,
396 pub raw: JsonValue,
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct SlackMessageEventPayload {
401 #[serde(flatten)]
402 pub common: SlackEventCommon,
403 pub subtype: Option<String>,
404 pub channel_type: Option<String>,
405 pub channel: Option<String>,
406 pub user: Option<String>,
407 pub text: Option<String>,
408 pub ts: Option<String>,
409 pub thread_ts: Option<String>,
410}
411
412#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
413pub struct SlackAppMentionEventPayload {
414 #[serde(flatten)]
415 pub common: SlackEventCommon,
416 pub channel: Option<String>,
417 pub user: Option<String>,
418 pub text: Option<String>,
419 pub ts: Option<String>,
420 pub thread_ts: Option<String>,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424pub struct SlackReactionAddedEventPayload {
425 #[serde(flatten)]
426 pub common: SlackEventCommon,
427 pub reaction: Option<String>,
428 pub item_user: Option<String>,
429 pub item: JsonValue,
430}
431
432#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
433pub struct SlackAppHomeOpenedEventPayload {
434 #[serde(flatten)]
435 pub common: SlackEventCommon,
436 pub user: Option<String>,
437 pub channel: Option<String>,
438 pub tab: Option<String>,
439 pub view: JsonValue,
440}
441
442#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
443pub struct SlackAssistantThreadStartedEventPayload {
444 #[serde(flatten)]
445 pub common: SlackEventCommon,
446 pub assistant_thread: JsonValue,
447 pub thread_ts: Option<String>,
448 pub context: JsonValue,
449}
450
451#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
452#[serde(untagged)]
453pub enum SlackEventPayload {
454 Message(SlackMessageEventPayload),
455 AppMention(SlackAppMentionEventPayload),
456 ReactionAdded(SlackReactionAddedEventPayload),
457 AppHomeOpened(SlackAppHomeOpenedEventPayload),
458 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
459 Other(SlackEventCommon),
460}
461
462#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
463pub struct LinearEventCommon {
464 pub event: String,
465 pub action: Option<String>,
466 pub delivery_id: Option<String>,
467 pub organization_id: Option<String>,
468 pub webhook_timestamp: Option<i64>,
469 pub webhook_id: Option<String>,
470 pub url: Option<String>,
471 pub created_at: Option<String>,
472 pub actor: JsonValue,
473 pub raw: JsonValue,
474}
475
476#[derive(Clone, Debug, PartialEq)]
477pub enum LinearIssueChange {
478 Title { previous: Option<String> },
479 Description { previous: Option<String> },
480 Priority { previous: Option<i64> },
481 Estimate { previous: Option<i64> },
482 StateId { previous: Option<String> },
483 TeamId { previous: Option<String> },
484 AssigneeId { previous: Option<String> },
485 ProjectId { previous: Option<String> },
486 CycleId { previous: Option<String> },
487 DueDate { previous: Option<String> },
488 ParentId { previous: Option<String> },
489 SortOrder { previous: Option<f64> },
490 LabelIds { previous: Vec<String> },
491 CompletedAt { previous: Option<String> },
492 Other { field: String, previous: JsonValue },
493}
494
495impl Serialize for LinearIssueChange {
496 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
497 where
498 S: Serializer,
499 {
500 let value = match self {
501 Self::Title { previous } => {
502 serde_json::json!({ "field_name": "title", "previous": previous })
503 }
504 Self::Description { previous } => {
505 serde_json::json!({ "field_name": "description", "previous": previous })
506 }
507 Self::Priority { previous } => {
508 serde_json::json!({ "field_name": "priority", "previous": previous })
509 }
510 Self::Estimate { previous } => {
511 serde_json::json!({ "field_name": "estimate", "previous": previous })
512 }
513 Self::StateId { previous } => {
514 serde_json::json!({ "field_name": "state_id", "previous": previous })
515 }
516 Self::TeamId { previous } => {
517 serde_json::json!({ "field_name": "team_id", "previous": previous })
518 }
519 Self::AssigneeId { previous } => {
520 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
521 }
522 Self::ProjectId { previous } => {
523 serde_json::json!({ "field_name": "project_id", "previous": previous })
524 }
525 Self::CycleId { previous } => {
526 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
527 }
528 Self::DueDate { previous } => {
529 serde_json::json!({ "field_name": "due_date", "previous": previous })
530 }
531 Self::ParentId { previous } => {
532 serde_json::json!({ "field_name": "parent_id", "previous": previous })
533 }
534 Self::SortOrder { previous } => {
535 serde_json::json!({ "field_name": "sort_order", "previous": previous })
536 }
537 Self::LabelIds { previous } => {
538 serde_json::json!({ "field_name": "label_ids", "previous": previous })
539 }
540 Self::CompletedAt { previous } => {
541 serde_json::json!({ "field_name": "completed_at", "previous": previous })
542 }
543 Self::Other { field, previous } => {
544 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
545 }
546 };
547 value.serialize(serializer)
548 }
549}
550
551impl<'de> Deserialize<'de> for LinearIssueChange {
552 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
553 where
554 D: Deserializer<'de>,
555 {
556 let value = JsonValue::deserialize(deserializer)?;
557 let field_name = value
558 .get("field_name")
559 .and_then(JsonValue::as_str)
560 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
561 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
562 Ok(match field_name {
563 "title" => Self::Title {
564 previous: previous.as_str().map(ToString::to_string),
565 },
566 "description" => Self::Description {
567 previous: previous.as_str().map(ToString::to_string),
568 },
569 "priority" => Self::Priority {
570 previous: parse_json_i64ish(&previous),
571 },
572 "estimate" => Self::Estimate {
573 previous: parse_json_i64ish(&previous),
574 },
575 "state_id" => Self::StateId {
576 previous: previous.as_str().map(ToString::to_string),
577 },
578 "team_id" => Self::TeamId {
579 previous: previous.as_str().map(ToString::to_string),
580 },
581 "assignee_id" => Self::AssigneeId {
582 previous: previous.as_str().map(ToString::to_string),
583 },
584 "project_id" => Self::ProjectId {
585 previous: previous.as_str().map(ToString::to_string),
586 },
587 "cycle_id" => Self::CycleId {
588 previous: previous.as_str().map(ToString::to_string),
589 },
590 "due_date" => Self::DueDate {
591 previous: previous.as_str().map(ToString::to_string),
592 },
593 "parent_id" => Self::ParentId {
594 previous: previous.as_str().map(ToString::to_string),
595 },
596 "sort_order" => Self::SortOrder {
597 previous: previous.as_f64(),
598 },
599 "label_ids" => Self::LabelIds {
600 previous: parse_string_array(&previous),
601 },
602 "completed_at" => Self::CompletedAt {
603 previous: previous.as_str().map(ToString::to_string),
604 },
605 "other" => Self::Other {
606 field: value
607 .get("field")
608 .and_then(JsonValue::as_str)
609 .map(ToString::to_string)
610 .unwrap_or_else(|| "unknown".to_string()),
611 previous,
612 },
613 other => Self::Other {
614 field: other.to_string(),
615 previous,
616 },
617 })
618 }
619}
620
621#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
622pub struct LinearIssueEventPayload {
623 #[serde(flatten)]
624 pub common: LinearEventCommon,
625 pub issue: JsonValue,
626 #[serde(default)]
627 pub changes: Vec<LinearIssueChange>,
628}
629
630#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
631pub struct LinearIssueCommentEventPayload {
632 #[serde(flatten)]
633 pub common: LinearEventCommon,
634 pub comment: JsonValue,
635}
636
637#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
638pub struct LinearIssueLabelEventPayload {
639 #[serde(flatten)]
640 pub common: LinearEventCommon,
641 pub label: JsonValue,
642}
643
644#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
645pub struct LinearProjectEventPayload {
646 #[serde(flatten)]
647 pub common: LinearEventCommon,
648 pub project: JsonValue,
649}
650
651#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
652pub struct LinearCycleEventPayload {
653 #[serde(flatten)]
654 pub common: LinearEventCommon,
655 pub cycle: JsonValue,
656}
657
658#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
659pub struct LinearCustomerEventPayload {
660 #[serde(flatten)]
661 pub common: LinearEventCommon,
662 pub customer: JsonValue,
663}
664
665#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
666pub struct LinearCustomerRequestEventPayload {
667 #[serde(flatten)]
668 pub common: LinearEventCommon,
669 pub customer_request: JsonValue,
670}
671
672#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
673#[serde(untagged)]
674pub enum LinearEventPayload {
675 Issue(LinearIssueEventPayload),
676 IssueComment(LinearIssueCommentEventPayload),
677 IssueLabel(LinearIssueLabelEventPayload),
678 Project(LinearProjectEventPayload),
679 Cycle(LinearCycleEventPayload),
680 Customer(LinearCustomerEventPayload),
681 CustomerRequest(LinearCustomerRequestEventPayload),
682 Other(LinearEventCommon),
683}
684
685#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
686pub struct NotionPolledChangeEvent {
687 pub resource: String,
688 pub source_id: String,
689 pub entity_id: String,
690 pub high_water_mark: String,
691 pub before: Option<JsonValue>,
692 pub after: JsonValue,
693}
694
695#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
696pub struct NotionEventPayload {
697 pub event: String,
698 pub workspace_id: Option<String>,
699 pub request_id: Option<String>,
700 pub subscription_id: Option<String>,
701 pub integration_id: Option<String>,
702 pub attempt_number: Option<u32>,
703 pub entity_id: Option<String>,
704 pub entity_type: Option<String>,
705 pub api_version: Option<String>,
706 pub verification_token: Option<String>,
707 pub polled: Option<NotionPolledChangeEvent>,
708 pub raw: JsonValue,
709}
710
711#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
712pub struct CronEventPayload {
713 pub cron_id: Option<String>,
714 pub schedule: Option<String>,
715 #[serde(with = "time::serde::rfc3339")]
716 pub tick_at: OffsetDateTime,
717 pub raw: JsonValue,
718}
719
720#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
721pub struct GenericWebhookPayload {
722 pub source: Option<String>,
723 pub content_type: Option<String>,
724 pub raw: JsonValue,
725}
726
727#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
728pub struct A2aPushPayload {
729 pub task_id: Option<String>,
730 pub task_state: Option<String>,
731 pub artifact: Option<JsonValue>,
732 pub sender: Option<String>,
733 pub raw: JsonValue,
734 pub kind: String,
735}
736
737#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
738pub struct StreamEventPayload {
739 pub event: String,
740 pub source: Option<String>,
741 pub stream: Option<String>,
742 pub partition: Option<String>,
743 pub offset: Option<String>,
744 pub key: Option<String>,
745 pub timestamp: Option<String>,
746 #[serde(default)]
747 pub headers: BTreeMap<String, String>,
748 pub raw: JsonValue,
749}
750
751#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
752pub struct ExtensionProviderPayload {
753 pub provider: String,
754 pub schema_name: String,
755 pub raw: JsonValue,
756}
757
758#[allow(clippy::large_enum_variant)]
759#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
760#[serde(untagged)]
761pub enum ProviderPayload {
762 Known(KnownProviderPayload),
763 Extension(ExtensionProviderPayload),
764}
765
766impl ProviderPayload {
767 pub fn provider(&self) -> &str {
768 match self {
769 Self::Known(known) => known.provider(),
770 Self::Extension(payload) => payload.provider.as_str(),
771 }
772 }
773
774 pub fn normalize(
775 provider: &ProviderId,
776 kind: &str,
777 headers: &BTreeMap<String, String>,
778 raw: JsonValue,
779 ) -> Result<Self, ProviderCatalogError> {
780 provider_catalog()
781 .read()
782 .expect("provider catalog poisoned")
783 .normalize(provider, kind, headers, raw)
784 }
785}
786
787#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
788#[serde(tag = "provider")]
789pub enum KnownProviderPayload {
790 #[serde(rename = "github")]
791 GitHub(GitHubEventPayload),
792 #[serde(rename = "slack")]
793 Slack(Box<SlackEventPayload>),
794 #[serde(rename = "linear")]
795 Linear(LinearEventPayload),
796 #[serde(rename = "notion")]
797 Notion(Box<NotionEventPayload>),
798 #[serde(rename = "cron")]
799 Cron(CronEventPayload),
800 #[serde(rename = "webhook")]
801 Webhook(GenericWebhookPayload),
802 #[serde(rename = "a2a-push")]
803 A2aPush(A2aPushPayload),
804 #[serde(rename = "kafka")]
805 Kafka(StreamEventPayload),
806 #[serde(rename = "nats")]
807 Nats(StreamEventPayload),
808 #[serde(rename = "pulsar")]
809 Pulsar(StreamEventPayload),
810 #[serde(rename = "postgres-cdc")]
811 PostgresCdc(StreamEventPayload),
812 #[serde(rename = "email")]
813 Email(StreamEventPayload),
814 #[serde(rename = "websocket")]
815 Websocket(StreamEventPayload),
816}
817
818impl KnownProviderPayload {
819 pub fn provider(&self) -> &str {
820 match self {
821 Self::GitHub(_) => "github",
822 Self::Slack(_) => "slack",
823 Self::Linear(_) => "linear",
824 Self::Notion(_) => "notion",
825 Self::Cron(_) => "cron",
826 Self::Webhook(_) => "webhook",
827 Self::A2aPush(_) => "a2a-push",
828 Self::Kafka(_) => "kafka",
829 Self::Nats(_) => "nats",
830 Self::Pulsar(_) => "pulsar",
831 Self::PostgresCdc(_) => "postgres-cdc",
832 Self::Email(_) => "email",
833 Self::Websocket(_) => "websocket",
834 }
835 }
836}
837
838#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
839pub struct TriggerEvent {
840 pub id: TriggerEventId,
841 pub provider: ProviderId,
842 pub kind: String,
843 #[serde(with = "time::serde::rfc3339")]
844 pub received_at: OffsetDateTime,
845 #[serde(with = "time::serde::rfc3339::option")]
846 pub occurred_at: Option<OffsetDateTime>,
847 pub dedupe_key: String,
848 pub trace_id: TraceId,
849 pub tenant_id: Option<TenantId>,
850 pub headers: BTreeMap<String, String>,
851 #[serde(default, skip_serializing_if = "Option::is_none")]
852 pub batch: Option<Vec<JsonValue>>,
853 #[serde(
854 default,
855 skip_serializing_if = "Option::is_none",
856 serialize_with = "serialize_optional_bytes_b64",
857 deserialize_with = "deserialize_optional_bytes_b64"
858 )]
859 pub raw_body: Option<Vec<u8>>,
860 pub provider_payload: ProviderPayload,
861 pub signature_status: SignatureStatus,
862 #[serde(skip)]
863 pub dedupe_claimed: bool,
864}
865
866impl TriggerEvent {
867 #[allow(clippy::too_many_arguments)]
868 pub fn new(
869 provider: ProviderId,
870 kind: impl Into<String>,
871 occurred_at: Option<OffsetDateTime>,
872 dedupe_key: impl Into<String>,
873 tenant_id: Option<TenantId>,
874 headers: BTreeMap<String, String>,
875 provider_payload: ProviderPayload,
876 signature_status: SignatureStatus,
877 ) -> Self {
878 Self {
879 id: TriggerEventId::new(),
880 provider,
881 kind: kind.into(),
882 received_at: clock::now_utc(),
883 occurred_at,
884 dedupe_key: dedupe_key.into(),
885 trace_id: TraceId::new(),
886 tenant_id,
887 headers,
888 batch: None,
889 raw_body: None,
890 provider_payload,
891 signature_status,
892 dedupe_claimed: false,
893 }
894 }
895
896 pub fn dedupe_claimed(&self) -> bool {
897 self.dedupe_claimed
898 }
899
900 pub fn mark_dedupe_claimed(&mut self) {
901 self.dedupe_claimed = true;
902 }
903}
904
905#[derive(Clone, Debug, PartialEq, Eq)]
906pub struct HeaderRedactionPolicy {
907 safe_exact_names: BTreeSet<String>,
908}
909
910impl HeaderRedactionPolicy {
911 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
912 self.safe_exact_names
913 .insert(name.into().to_ascii_lowercase());
914 self
915 }
916
917 fn should_keep(&self, name: &str) -> bool {
918 let lower = name.to_ascii_lowercase();
919 if self.safe_exact_names.contains(lower.as_str()) {
920 return true;
921 }
922 matches!(
923 lower.as_str(),
924 "user-agent"
925 | "request-id"
926 | "x-request-id"
927 | "x-correlation-id"
928 | "content-type"
929 | "content-length"
930 | "x-github-event"
931 | "x-github-delivery"
932 | "x-github-hook-id"
933 | "x-hub-signature-256"
934 | "x-slack-request-timestamp"
935 | "x-slack-signature"
936 | "x-linear-signature"
937 | "x-notion-signature"
938 | "x-a2a-signature"
939 | "x-a2a-delivery"
940 ) || lower.ends_with("-event")
941 || lower.ends_with("-delivery")
942 || lower.contains("timestamp")
943 || lower.contains("request-id")
944 }
945
946 fn should_redact(&self, name: &str) -> bool {
947 let lower = name.to_ascii_lowercase();
948 if self.should_keep(lower.as_str()) {
949 return false;
950 }
951 lower.contains("authorization")
952 || lower.contains("cookie")
953 || lower.contains("secret")
954 || lower.contains("token")
955 || lower.contains("key")
956 }
957}
958
959impl Default for HeaderRedactionPolicy {
960 fn default() -> Self {
961 Self {
962 safe_exact_names: BTreeSet::from([
963 "content-length".to_string(),
964 "content-type".to_string(),
965 "request-id".to_string(),
966 "user-agent".to_string(),
967 "x-a2a-delivery".to_string(),
968 "x-a2a-signature".to_string(),
969 "x-correlation-id".to_string(),
970 "x-github-delivery".to_string(),
971 "x-github-event".to_string(),
972 "x-github-hook-id".to_string(),
973 "x-hub-signature-256".to_string(),
974 "x-linear-signature".to_string(),
975 "x-notion-signature".to_string(),
976 "x-request-id".to_string(),
977 "x-slack-request-timestamp".to_string(),
978 "x-slack-signature".to_string(),
979 ]),
980 }
981 }
982}
983
984pub fn redact_headers(
985 headers: &BTreeMap<String, String>,
986 policy: &HeaderRedactionPolicy,
987) -> BTreeMap<String, String> {
988 headers
989 .iter()
990 .map(|(name, value)| {
991 if policy.should_redact(name) {
992 (name.clone(), REDACTED_HEADER_VALUE.to_string())
993 } else {
994 (name.clone(), value.clone())
995 }
996 })
997 .collect()
998}
999
1000#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1001pub struct ProviderSecretRequirement {
1002 pub name: String,
1003 pub required: bool,
1004 pub namespace: String,
1005}
1006
1007#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1008pub struct ProviderOutboundMethod {
1009 pub name: String,
1010}
1011
1012#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
1013#[serde(tag = "kind", rename_all = "snake_case")]
1014pub enum SignatureVerificationMetadata {
1015 #[default]
1016 None,
1017 Hmac {
1018 variant: String,
1019 raw_body: bool,
1020 signature_header: String,
1021 timestamp_header: Option<String>,
1022 id_header: Option<String>,
1023 default_tolerance_secs: Option<i64>,
1024 digest: String,
1025 encoding: String,
1026 },
1027}
1028
1029#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
1030#[serde(tag = "kind", rename_all = "snake_case")]
1031pub enum ProviderRuntimeMetadata {
1032 Builtin {
1033 connector: String,
1034 default_signature_variant: Option<String>,
1035 },
1036 #[default]
1037 Placeholder,
1038}
1039
1040#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
1041pub struct ProviderMetadata {
1042 pub provider: String,
1043 #[serde(default)]
1044 pub kinds: Vec<String>,
1045 pub schema_name: String,
1046 #[serde(default)]
1047 pub outbound_methods: Vec<ProviderOutboundMethod>,
1048 #[serde(default)]
1049 pub secret_requirements: Vec<ProviderSecretRequirement>,
1050 #[serde(default)]
1051 pub signature_verification: SignatureVerificationMetadata,
1052 #[serde(default)]
1053 pub runtime: ProviderRuntimeMetadata,
1054}
1055
1056impl ProviderMetadata {
1057 pub fn supports_kind(&self, kind: &str) -> bool {
1058 self.kinds.iter().any(|candidate| candidate == kind)
1059 }
1060
1061 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
1062 self.secret_requirements
1063 .iter()
1064 .filter(|requirement| requirement.required)
1065 .map(|requirement| requirement.name.as_str())
1066 }
1067}
1068
1069pub trait ProviderSchema: Send + Sync {
1070 fn provider_id(&self) -> &str;
1071 fn harn_schema_name(&self) -> &str;
1072 fn metadata(&self) -> ProviderMetadata {
1073 ProviderMetadata {
1074 provider: self.provider_id().to_string(),
1075 schema_name: self.harn_schema_name().to_string(),
1076 ..ProviderMetadata::default()
1077 }
1078 }
1079 fn normalize(
1080 &self,
1081 kind: &str,
1082 headers: &BTreeMap<String, String>,
1083 raw: JsonValue,
1084 ) -> Result<ProviderPayload, ProviderCatalogError>;
1085}
1086
1087#[derive(Clone, Debug, PartialEq, Eq)]
1088pub enum ProviderCatalogError {
1089 DuplicateProvider(String),
1090 UnknownProvider(String),
1091}
1092
1093impl std::fmt::Display for ProviderCatalogError {
1094 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1095 match self {
1096 Self::DuplicateProvider(provider) => {
1097 write!(f, "provider `{provider}` is already registered")
1098 }
1099 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
1100 }
1101 }
1102}
1103
1104impl std::error::Error for ProviderCatalogError {}
1105
1106#[derive(Clone, Default)]
1107pub struct ProviderCatalog {
1108 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
1109}
1110
1111impl ProviderCatalog {
1112 pub fn with_defaults() -> Self {
1113 let mut catalog = Self::default();
1114 for schema in default_provider_schemas() {
1115 catalog
1116 .register(schema)
1117 .expect("default providers must register cleanly");
1118 }
1119 catalog
1120 }
1121
1122 pub fn with_defaults_and(
1123 schemas: Vec<Arc<dyn ProviderSchema>>,
1124 ) -> Result<Self, ProviderCatalogError> {
1125 let mut catalog = Self::with_defaults();
1126 let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
1127 for schema in schemas {
1128 if builtin_providers.contains(schema.provider_id()) {
1129 continue;
1130 }
1131 catalog.register(schema)?;
1132 }
1133 Ok(catalog)
1134 }
1135
1136 pub fn register(
1137 &mut self,
1138 schema: Arc<dyn ProviderSchema>,
1139 ) -> Result<(), ProviderCatalogError> {
1140 let provider = schema.provider_id().to_string();
1141 if self.providers.contains_key(provider.as_str()) {
1142 return Err(ProviderCatalogError::DuplicateProvider(provider));
1143 }
1144 self.providers.insert(provider, schema);
1145 Ok(())
1146 }
1147
1148 pub fn normalize(
1149 &self,
1150 provider: &ProviderId,
1151 kind: &str,
1152 headers: &BTreeMap<String, String>,
1153 raw: JsonValue,
1154 ) -> Result<ProviderPayload, ProviderCatalogError> {
1155 let schema = self
1156 .providers
1157 .get(provider.as_str())
1158 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
1159 schema.normalize(kind, headers, raw)
1160 }
1161
1162 pub fn schema_names(&self) -> BTreeMap<String, String> {
1163 self.providers
1164 .iter()
1165 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
1166 .collect()
1167 }
1168
1169 pub fn entries(&self) -> Vec<ProviderMetadata> {
1170 self.providers
1171 .values()
1172 .map(|schema| schema.metadata())
1173 .collect()
1174 }
1175
1176 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
1177 self.providers.get(provider).map(|schema| schema.metadata())
1178 }
1179}
1180
1181pub fn register_provider_schema(
1182 schema: Arc<dyn ProviderSchema>,
1183) -> Result<(), ProviderCatalogError> {
1184 provider_catalog()
1185 .write()
1186 .expect("provider catalog poisoned")
1187 .register(schema)
1188}
1189
1190pub fn reset_provider_catalog() {
1191 *provider_catalog()
1192 .write()
1193 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
1194}
1195
1196pub fn reset_provider_catalog_with(
1197 schemas: Vec<Arc<dyn ProviderSchema>>,
1198) -> Result<(), ProviderCatalogError> {
1199 let catalog = ProviderCatalog::with_defaults_and(schemas)?;
1200 install_provider_catalog(catalog);
1201 Ok(())
1202}
1203
1204pub fn install_provider_catalog(catalog: ProviderCatalog) {
1205 *provider_catalog()
1206 .write()
1207 .expect("provider catalog poisoned") = catalog;
1208}
1209
1210pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1211 provider_catalog()
1212 .read()
1213 .expect("provider catalog poisoned")
1214 .schema_names()
1215}
1216
1217pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1218 provider_catalog()
1219 .read()
1220 .expect("provider catalog poisoned")
1221 .entries()
1222}
1223
1224pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1225 provider_catalog()
1226 .read()
1227 .expect("provider catalog poisoned")
1228 .metadata_for(provider)
1229}
1230
1231fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1232 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1233 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1234}
1235
1236struct BuiltinProviderSchema {
1237 provider_id: &'static str,
1238 harn_schema_name: &'static str,
1239 metadata: ProviderMetadata,
1240 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1241}
1242
1243impl ProviderSchema for BuiltinProviderSchema {
1244 fn provider_id(&self) -> &str {
1245 self.provider_id
1246 }
1247
1248 fn harn_schema_name(&self) -> &str {
1249 self.harn_schema_name
1250 }
1251
1252 fn metadata(&self) -> ProviderMetadata {
1253 self.metadata.clone()
1254 }
1255
1256 fn normalize(
1257 &self,
1258 kind: &str,
1259 headers: &BTreeMap<String, String>,
1260 raw: JsonValue,
1261 ) -> Result<ProviderPayload, ProviderCatalogError> {
1262 Ok((self.normalize)(kind, headers, raw))
1263 }
1264}
1265
1266fn provider_metadata_entry(
1267 provider: &str,
1268 kinds: &[&str],
1269 schema_name: &str,
1270 outbound_methods: &[&str],
1271 signature_verification: SignatureVerificationMetadata,
1272 secret_requirements: Vec<ProviderSecretRequirement>,
1273 runtime: ProviderRuntimeMetadata,
1274) -> ProviderMetadata {
1275 ProviderMetadata {
1276 provider: provider.to_string(),
1277 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1278 schema_name: schema_name.to_string(),
1279 outbound_methods: outbound_methods
1280 .iter()
1281 .map(|name| ProviderOutboundMethod {
1282 name: (*name).to_string(),
1283 })
1284 .collect(),
1285 secret_requirements,
1286 signature_verification,
1287 runtime,
1288 }
1289}
1290
1291fn hmac_signature_metadata(
1292 variant: &str,
1293 signature_header: &str,
1294 timestamp_header: Option<&str>,
1295 id_header: Option<&str>,
1296 default_tolerance_secs: Option<i64>,
1297 encoding: &str,
1298) -> SignatureVerificationMetadata {
1299 SignatureVerificationMetadata::Hmac {
1300 variant: variant.to_string(),
1301 raw_body: true,
1302 signature_header: signature_header.to_string(),
1303 timestamp_header: timestamp_header.map(ToString::to_string),
1304 id_header: id_header.map(ToString::to_string),
1305 default_tolerance_secs,
1306 digest: "sha256".to_string(),
1307 encoding: encoding.to_string(),
1308 }
1309}
1310
1311fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1312 ProviderSecretRequirement {
1313 name: name.to_string(),
1314 required: true,
1315 namespace: namespace.to_string(),
1316 }
1317}
1318
1319fn outbound_method(name: &str) -> ProviderOutboundMethod {
1320 ProviderOutboundMethod {
1321 name: name.to_string(),
1322 }
1323}
1324
1325fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1326 ProviderSecretRequirement {
1327 name: name.to_string(),
1328 required: false,
1329 namespace: namespace.to_string(),
1330 }
1331}
1332
1333fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1334 vec![
1335 Arc::new(BuiltinProviderSchema {
1336 provider_id: "github",
1337 harn_schema_name: "GitHubEventPayload",
1338 metadata: provider_metadata_entry(
1339 "github",
1340 &["webhook"],
1341 "GitHubEventPayload",
1342 &[],
1343 hmac_signature_metadata(
1344 "github",
1345 "X-Hub-Signature-256",
1346 None,
1347 Some("X-GitHub-Delivery"),
1348 None,
1349 "hex",
1350 ),
1351 vec![required_secret("signing_secret", "github")],
1352 ProviderRuntimeMetadata::Placeholder,
1353 ),
1354 normalize: github_payload,
1355 }),
1356 Arc::new(BuiltinProviderSchema {
1357 provider_id: "slack",
1358 harn_schema_name: "SlackEventPayload",
1359 metadata: provider_metadata_entry(
1360 "slack",
1361 &["webhook"],
1362 "SlackEventPayload",
1363 &[
1364 "post_message",
1365 "update_message",
1366 "add_reaction",
1367 "open_view",
1368 "user_info",
1369 "api_call",
1370 "upload_file",
1371 ],
1372 hmac_signature_metadata(
1373 "slack",
1374 "X-Slack-Signature",
1375 Some("X-Slack-Request-Timestamp"),
1376 None,
1377 Some(300),
1378 "hex",
1379 ),
1380 vec![required_secret("signing_secret", "slack")],
1381 ProviderRuntimeMetadata::Placeholder,
1382 ),
1383 normalize: slack_payload,
1384 }),
1385 Arc::new(BuiltinProviderSchema {
1386 provider_id: "linear",
1387 harn_schema_name: "LinearEventPayload",
1388 metadata: {
1389 let mut metadata = provider_metadata_entry(
1390 "linear",
1391 &["webhook"],
1392 "LinearEventPayload",
1393 &[],
1394 hmac_signature_metadata(
1395 "linear",
1396 "Linear-Signature",
1397 None,
1398 Some("Linear-Delivery"),
1399 Some(75),
1400 "hex",
1401 ),
1402 vec![
1403 required_secret("signing_secret", "linear"),
1404 optional_secret("access_token", "linear"),
1405 ],
1406 ProviderRuntimeMetadata::Placeholder,
1407 );
1408 metadata.outbound_methods = vec![
1409 ProviderOutboundMethod {
1410 name: "list_issues".to_string(),
1411 },
1412 ProviderOutboundMethod {
1413 name: "update_issue".to_string(),
1414 },
1415 ProviderOutboundMethod {
1416 name: "create_comment".to_string(),
1417 },
1418 ProviderOutboundMethod {
1419 name: "search".to_string(),
1420 },
1421 ProviderOutboundMethod {
1422 name: "graphql".to_string(),
1423 },
1424 ];
1425 metadata
1426 },
1427 normalize: linear_payload,
1428 }),
1429 Arc::new(BuiltinProviderSchema {
1430 provider_id: "notion",
1431 harn_schema_name: "NotionEventPayload",
1432 metadata: {
1433 let mut metadata = provider_metadata_entry(
1434 "notion",
1435 &["webhook", "poll"],
1436 "NotionEventPayload",
1437 &[],
1438 hmac_signature_metadata(
1439 "notion",
1440 "X-Notion-Signature",
1441 None,
1442 None,
1443 None,
1444 "hex",
1445 ),
1446 vec![required_secret("verification_token", "notion")],
1447 ProviderRuntimeMetadata::Placeholder,
1448 );
1449 metadata.outbound_methods = vec![
1450 outbound_method("get_page"),
1451 outbound_method("update_page"),
1452 outbound_method("append_blocks"),
1453 outbound_method("query_database"),
1454 outbound_method("search"),
1455 outbound_method("create_comment"),
1456 outbound_method("api_call"),
1457 ];
1458 metadata
1459 },
1460 normalize: notion_payload,
1461 }),
1462 Arc::new(BuiltinProviderSchema {
1463 provider_id: "cron",
1464 harn_schema_name: "CronEventPayload",
1465 metadata: provider_metadata_entry(
1466 "cron",
1467 &["cron"],
1468 "CronEventPayload",
1469 &[],
1470 SignatureVerificationMetadata::None,
1471 Vec::new(),
1472 ProviderRuntimeMetadata::Builtin {
1473 connector: "cron".to_string(),
1474 default_signature_variant: None,
1475 },
1476 ),
1477 normalize: cron_payload,
1478 }),
1479 Arc::new(BuiltinProviderSchema {
1480 provider_id: "webhook",
1481 harn_schema_name: "GenericWebhookPayload",
1482 metadata: provider_metadata_entry(
1483 "webhook",
1484 &["webhook"],
1485 "GenericWebhookPayload",
1486 &[],
1487 hmac_signature_metadata(
1488 "standard",
1489 "webhook-signature",
1490 Some("webhook-timestamp"),
1491 Some("webhook-id"),
1492 Some(300),
1493 "base64",
1494 ),
1495 vec![required_secret("signing_secret", "webhook")],
1496 ProviderRuntimeMetadata::Builtin {
1497 connector: "webhook".to_string(),
1498 default_signature_variant: Some("standard".to_string()),
1499 },
1500 ),
1501 normalize: webhook_payload,
1502 }),
1503 Arc::new(BuiltinProviderSchema {
1504 provider_id: "a2a-push",
1505 harn_schema_name: "A2aPushPayload",
1506 metadata: provider_metadata_entry(
1507 "a2a-push",
1508 &["a2a-push"],
1509 "A2aPushPayload",
1510 &[],
1511 SignatureVerificationMetadata::None,
1512 Vec::new(),
1513 ProviderRuntimeMetadata::Builtin {
1514 connector: "a2a-push".to_string(),
1515 default_signature_variant: None,
1516 },
1517 ),
1518 normalize: a2a_push_payload,
1519 }),
1520 Arc::new(stream_provider_schema("kafka", kafka_payload)),
1521 Arc::new(stream_provider_schema("nats", nats_payload)),
1522 Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1523 Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1524 Arc::new(stream_provider_schema("email", email_payload)),
1525 Arc::new(stream_provider_schema("websocket", websocket_payload)),
1526 ]
1527}
1528
1529fn stream_provider_schema(
1530 provider_id: &'static str,
1531 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1532) -> BuiltinProviderSchema {
1533 BuiltinProviderSchema {
1534 provider_id,
1535 harn_schema_name: "StreamEventPayload",
1536 metadata: provider_metadata_entry(
1537 provider_id,
1538 &["stream"],
1539 "StreamEventPayload",
1540 &[],
1541 SignatureVerificationMetadata::None,
1542 Vec::new(),
1543 ProviderRuntimeMetadata::Builtin {
1544 connector: "stream".to_string(),
1545 default_signature_variant: None,
1546 },
1547 ),
1548 normalize,
1549 }
1550}
1551
1552fn github_payload(
1553 kind: &str,
1554 headers: &BTreeMap<String, String>,
1555 raw: JsonValue,
1556) -> ProviderPayload {
1557 let original_raw = raw
1562 .get("raw")
1563 .filter(|value| value.is_object())
1564 .cloned()
1565 .unwrap_or_else(|| raw.clone());
1566 let common = GitHubEventCommon {
1567 event: kind.to_string(),
1568 action: raw
1569 .get("action")
1570 .and_then(JsonValue::as_str)
1571 .map(ToString::to_string),
1572 delivery_id: raw
1573 .get("delivery_id")
1574 .and_then(JsonValue::as_str)
1575 .map(ToString::to_string)
1576 .or_else(|| headers.get("X-GitHub-Delivery").cloned()),
1577 installation_id: raw
1578 .get("installation_id")
1579 .and_then(JsonValue::as_i64)
1580 .or_else(|| {
1581 raw.get("installation")
1582 .and_then(|value| value.get("id"))
1583 .and_then(JsonValue::as_i64)
1584 }),
1585 topic: raw
1586 .get("topic")
1587 .and_then(JsonValue::as_str)
1588 .map(ToString::to_string),
1589 repository: raw.get("repository").cloned(),
1590 repo: raw.get("repo").cloned(),
1591 raw: original_raw,
1592 };
1593 let payload = match kind {
1594 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1595 common,
1596 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1597 }),
1598 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1599 common,
1600 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1601 }),
1602 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1603 common,
1604 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1605 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1606 }),
1607 "pull_request_review" => {
1608 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1609 common,
1610 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1611 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1612 })
1613 }
1614 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1615 common,
1616 commits: raw
1617 .get("commits")
1618 .and_then(JsonValue::as_array)
1619 .cloned()
1620 .unwrap_or_default(),
1621 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1622 }),
1623 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1624 common,
1625 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1626 }),
1627 "deployment_status" => {
1628 GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1629 common,
1630 deployment_status: raw
1631 .get("deployment_status")
1632 .cloned()
1633 .unwrap_or(JsonValue::Null),
1634 deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1635 })
1636 }
1637 "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1638 common,
1639 check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1640 }),
1641 "check_suite" => {
1642 let check_suite = raw.get("check_suite").cloned().unwrap_or(JsonValue::Null);
1643 GitHubEventPayload::CheckSuite(GitHubCheckSuiteEventPayload {
1644 check_suite_id: github_promoted_i64(&raw, "check_suite_id")
1645 .or_else(|| check_suite.get("id").and_then(JsonValue::as_i64)),
1646 pull_request_number: github_promoted_i64(&raw, "pull_request_number"),
1647 head_sha: github_promoted_string(&raw, "head_sha"),
1648 head_ref: github_promoted_string(&raw, "head_ref"),
1649 base_ref: github_promoted_string(&raw, "base_ref"),
1650 status: github_promoted_string(&raw, "status"),
1651 conclusion: github_promoted_string(&raw, "conclusion"),
1652 common,
1653 check_suite,
1654 })
1655 }
1656 "status" => GitHubEventPayload::Status(GitHubStatusEventPayload {
1657 commit_status: raw
1658 .get("commit_status")
1659 .cloned()
1660 .or_else(|| Some(common.raw.clone())),
1661 status_id: github_promoted_i64(&raw, "status_id")
1662 .or_else(|| common.raw.get("id").and_then(JsonValue::as_i64)),
1663 head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1664 common
1665 .raw
1666 .get("sha")
1667 .and_then(JsonValue::as_str)
1668 .map(ToString::to_string)
1669 }),
1670 head_ref: github_promoted_string(&raw, "head_ref"),
1671 base_ref: github_promoted_string(&raw, "base_ref"),
1672 state: github_promoted_string(&raw, "state"),
1673 context: github_promoted_string(&raw, "context"),
1674 target_url: github_promoted_string(&raw, "target_url"),
1675 common,
1676 }),
1677 "merge_group" => {
1678 let merge_group = raw.get("merge_group").cloned().unwrap_or(JsonValue::Null);
1679 GitHubEventPayload::MergeGroup(GitHubMergeGroupEventPayload {
1680 merge_group_id: raw
1681 .get("merge_group_id")
1682 .cloned()
1683 .or_else(|| merge_group.get("id").cloned()),
1684 head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1685 merge_group
1686 .get("head_sha")
1687 .and_then(JsonValue::as_str)
1688 .map(ToString::to_string)
1689 }),
1690 head_ref: github_promoted_string(&raw, "head_ref").or_else(|| {
1691 merge_group
1692 .get("head_ref")
1693 .and_then(JsonValue::as_str)
1694 .map(ToString::to_string)
1695 }),
1696 base_sha: github_promoted_string(&raw, "base_sha").or_else(|| {
1697 merge_group
1698 .get("base_sha")
1699 .and_then(JsonValue::as_str)
1700 .map(ToString::to_string)
1701 }),
1702 base_ref: github_promoted_string(&raw, "base_ref").or_else(|| {
1703 merge_group
1704 .get("base_ref")
1705 .and_then(JsonValue::as_str)
1706 .map(ToString::to_string)
1707 }),
1708 pull_requests: raw
1709 .get("pull_requests")
1710 .and_then(JsonValue::as_array)
1711 .cloned()
1712 .unwrap_or_default(),
1713 pull_request_numbers: raw
1714 .get("pull_request_numbers")
1715 .and_then(JsonValue::as_array)
1716 .map(|values| {
1717 values
1718 .iter()
1719 .filter_map(JsonValue::as_i64)
1720 .collect::<Vec<_>>()
1721 })
1722 .unwrap_or_default(),
1723 common,
1724 merge_group,
1725 })
1726 }
1727 "installation" => GitHubEventPayload::Installation(GitHubInstallationEventPayload {
1728 installation: raw.get("installation").cloned(),
1729 account: raw.get("account").cloned(),
1730 installation_state: github_promoted_string(&raw, "installation_state"),
1731 suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1732 revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1733 repositories: raw
1734 .get("repositories")
1735 .and_then(JsonValue::as_array)
1736 .cloned()
1737 .unwrap_or_default(),
1738 common,
1739 }),
1740 "installation_repositories" => GitHubEventPayload::InstallationRepositories(
1741 GitHubInstallationRepositoriesEventPayload {
1742 installation: raw.get("installation").cloned(),
1743 account: raw.get("account").cloned(),
1744 installation_state: github_promoted_string(&raw, "installation_state"),
1745 suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1746 revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1747 repository_selection: github_promoted_string(&raw, "repository_selection"),
1748 repositories_added: raw
1749 .get("repositories_added")
1750 .and_then(JsonValue::as_array)
1751 .cloned()
1752 .unwrap_or_default(),
1753 repositories_removed: raw
1754 .get("repositories_removed")
1755 .and_then(JsonValue::as_array)
1756 .cloned()
1757 .unwrap_or_default(),
1758 common,
1759 },
1760 ),
1761 _ => GitHubEventPayload::Other(common),
1762 };
1763 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1764}
1765
1766fn github_promoted_string(raw: &JsonValue, field: &str) -> Option<String> {
1767 raw.get(field)
1768 .and_then(JsonValue::as_str)
1769 .map(ToString::to_string)
1770}
1771
1772fn github_promoted_i64(raw: &JsonValue, field: &str) -> Option<i64> {
1773 raw.get(field).and_then(JsonValue::as_i64)
1774}
1775
1776fn slack_payload(
1777 kind: &str,
1778 _headers: &BTreeMap<String, String>,
1779 raw: JsonValue,
1780) -> ProviderPayload {
1781 let event = raw.get("event");
1782 let common = SlackEventCommon {
1783 event: kind.to_string(),
1784 event_id: raw
1785 .get("event_id")
1786 .and_then(JsonValue::as_str)
1787 .map(ToString::to_string),
1788 api_app_id: raw
1789 .get("api_app_id")
1790 .and_then(JsonValue::as_str)
1791 .map(ToString::to_string),
1792 team_id: raw
1793 .get("team_id")
1794 .and_then(JsonValue::as_str)
1795 .map(ToString::to_string),
1796 channel_id: slack_channel_id(event),
1797 user_id: slack_user_id(event),
1798 event_ts: event
1799 .and_then(|value| value.get("event_ts"))
1800 .and_then(JsonValue::as_str)
1801 .map(ToString::to_string),
1802 raw: raw.clone(),
1803 };
1804 let payload = match kind {
1805 kind if kind == "message" || kind.starts_with("message.") => {
1806 SlackEventPayload::Message(SlackMessageEventPayload {
1807 subtype: event
1808 .and_then(|value| value.get("subtype"))
1809 .and_then(JsonValue::as_str)
1810 .map(ToString::to_string),
1811 channel_type: event
1812 .and_then(|value| value.get("channel_type"))
1813 .and_then(JsonValue::as_str)
1814 .map(ToString::to_string),
1815 channel: event
1816 .and_then(|value| value.get("channel"))
1817 .and_then(JsonValue::as_str)
1818 .map(ToString::to_string),
1819 user: event
1820 .and_then(|value| value.get("user"))
1821 .and_then(JsonValue::as_str)
1822 .map(ToString::to_string),
1823 text: event
1824 .and_then(|value| value.get("text"))
1825 .and_then(JsonValue::as_str)
1826 .map(ToString::to_string),
1827 ts: event
1828 .and_then(|value| value.get("ts"))
1829 .and_then(JsonValue::as_str)
1830 .map(ToString::to_string),
1831 thread_ts: event
1832 .and_then(|value| value.get("thread_ts"))
1833 .and_then(JsonValue::as_str)
1834 .map(ToString::to_string),
1835 common,
1836 })
1837 }
1838 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1839 channel: event
1840 .and_then(|value| value.get("channel"))
1841 .and_then(JsonValue::as_str)
1842 .map(ToString::to_string),
1843 user: event
1844 .and_then(|value| value.get("user"))
1845 .and_then(JsonValue::as_str)
1846 .map(ToString::to_string),
1847 text: event
1848 .and_then(|value| value.get("text"))
1849 .and_then(JsonValue::as_str)
1850 .map(ToString::to_string),
1851 ts: event
1852 .and_then(|value| value.get("ts"))
1853 .and_then(JsonValue::as_str)
1854 .map(ToString::to_string),
1855 thread_ts: event
1856 .and_then(|value| value.get("thread_ts"))
1857 .and_then(JsonValue::as_str)
1858 .map(ToString::to_string),
1859 common,
1860 }),
1861 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1862 reaction: event
1863 .and_then(|value| value.get("reaction"))
1864 .and_then(JsonValue::as_str)
1865 .map(ToString::to_string),
1866 item_user: event
1867 .and_then(|value| value.get("item_user"))
1868 .and_then(JsonValue::as_str)
1869 .map(ToString::to_string),
1870 item: event
1871 .and_then(|value| value.get("item"))
1872 .cloned()
1873 .unwrap_or(JsonValue::Null),
1874 common,
1875 }),
1876 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1877 user: event
1878 .and_then(|value| value.get("user"))
1879 .and_then(JsonValue::as_str)
1880 .map(ToString::to_string),
1881 channel: event
1882 .and_then(|value| value.get("channel"))
1883 .and_then(JsonValue::as_str)
1884 .map(ToString::to_string),
1885 tab: event
1886 .and_then(|value| value.get("tab"))
1887 .and_then(JsonValue::as_str)
1888 .map(ToString::to_string),
1889 view: event
1890 .and_then(|value| value.get("view"))
1891 .cloned()
1892 .unwrap_or(JsonValue::Null),
1893 common,
1894 }),
1895 "assistant_thread_started" => {
1896 let assistant_thread = event
1897 .and_then(|value| value.get("assistant_thread"))
1898 .cloned()
1899 .unwrap_or(JsonValue::Null);
1900 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1901 thread_ts: assistant_thread
1902 .get("thread_ts")
1903 .and_then(JsonValue::as_str)
1904 .map(ToString::to_string),
1905 context: assistant_thread
1906 .get("context")
1907 .cloned()
1908 .unwrap_or(JsonValue::Null),
1909 assistant_thread,
1910 common,
1911 })
1912 }
1913 _ => SlackEventPayload::Other(common),
1914 };
1915 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1916}
1917
1918fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1919 event
1920 .and_then(|value| value.get("channel"))
1921 .and_then(JsonValue::as_str)
1922 .map(ToString::to_string)
1923 .or_else(|| {
1924 event
1925 .and_then(|value| value.get("item"))
1926 .and_then(|value| value.get("channel"))
1927 .and_then(JsonValue::as_str)
1928 .map(ToString::to_string)
1929 })
1930 .or_else(|| {
1931 event
1932 .and_then(|value| value.get("channel"))
1933 .and_then(|value| value.get("id"))
1934 .and_then(JsonValue::as_str)
1935 .map(ToString::to_string)
1936 })
1937 .or_else(|| {
1938 event
1939 .and_then(|value| value.get("assistant_thread"))
1940 .and_then(|value| value.get("channel_id"))
1941 .and_then(JsonValue::as_str)
1942 .map(ToString::to_string)
1943 })
1944}
1945
1946fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1947 event
1948 .and_then(|value| value.get("user"))
1949 .and_then(JsonValue::as_str)
1950 .map(ToString::to_string)
1951 .or_else(|| {
1952 event
1953 .and_then(|value| value.get("user"))
1954 .and_then(|value| value.get("id"))
1955 .and_then(JsonValue::as_str)
1956 .map(ToString::to_string)
1957 })
1958 .or_else(|| {
1959 event
1960 .and_then(|value| value.get("item_user"))
1961 .and_then(JsonValue::as_str)
1962 .map(ToString::to_string)
1963 })
1964 .or_else(|| {
1965 event
1966 .and_then(|value| value.get("assistant_thread"))
1967 .and_then(|value| value.get("user_id"))
1968 .and_then(JsonValue::as_str)
1969 .map(ToString::to_string)
1970 })
1971}
1972
1973fn linear_payload(
1974 _kind: &str,
1975 headers: &BTreeMap<String, String>,
1976 raw: JsonValue,
1977) -> ProviderPayload {
1978 let common = linear_event_common(headers, &raw);
1979 let event = common.event.clone();
1980 let payload = match event.as_str() {
1981 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1982 common,
1983 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1984 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1985 }),
1986 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1987 common,
1988 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1989 }),
1990 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1991 common,
1992 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1993 }),
1994 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1995 common,
1996 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1997 }),
1998 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1999 common,
2000 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
2001 }),
2002 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
2003 common,
2004 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
2005 }),
2006 "customer_request" => {
2007 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
2008 common,
2009 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
2010 })
2011 }
2012 _ => LinearEventPayload::Other(common),
2013 };
2014 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
2015}
2016
2017fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
2018 LinearEventCommon {
2019 event: linear_event_name(
2020 raw.get("type")
2021 .and_then(JsonValue::as_str)
2022 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
2023 ),
2024 action: raw
2025 .get("action")
2026 .and_then(JsonValue::as_str)
2027 .map(ToString::to_string),
2028 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
2029 organization_id: raw
2030 .get("organizationId")
2031 .and_then(JsonValue::as_str)
2032 .map(ToString::to_string),
2033 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
2034 webhook_id: raw
2035 .get("webhookId")
2036 .and_then(JsonValue::as_str)
2037 .map(ToString::to_string),
2038 url: raw
2039 .get("url")
2040 .and_then(JsonValue::as_str)
2041 .map(ToString::to_string),
2042 created_at: raw
2043 .get("createdAt")
2044 .and_then(JsonValue::as_str)
2045 .map(ToString::to_string),
2046 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
2047 raw: raw.clone(),
2048 }
2049}
2050
2051fn linear_event_name(raw_type: Option<&str>) -> String {
2052 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
2053 "issue" => "issue".to_string(),
2054 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
2055 "issuelabel" | "issue_label" => "issue_label".to_string(),
2056 "project" | "projectupdate" | "project_update" => "project".to_string(),
2057 "cycle" => "cycle".to_string(),
2058 "customer" => "customer".to_string(),
2059 "customerrequest" | "customer_request" => "customer_request".to_string(),
2060 other if !other.is_empty() => other.to_string(),
2061 _ => "other".to_string(),
2062 }
2063}
2064
2065fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
2066 let Some(JsonValue::Object(fields)) = updated_from else {
2067 return Vec::new();
2068 };
2069 let mut changes = Vec::new();
2070 for (field, previous) in fields {
2071 let change = match field.as_str() {
2072 "title" => LinearIssueChange::Title {
2073 previous: previous.as_str().map(ToString::to_string),
2074 },
2075 "description" => LinearIssueChange::Description {
2076 previous: previous.as_str().map(ToString::to_string),
2077 },
2078 "priority" => LinearIssueChange::Priority {
2079 previous: parse_json_i64ish(previous),
2080 },
2081 "estimate" => LinearIssueChange::Estimate {
2082 previous: parse_json_i64ish(previous),
2083 },
2084 "stateId" => LinearIssueChange::StateId {
2085 previous: previous.as_str().map(ToString::to_string),
2086 },
2087 "teamId" => LinearIssueChange::TeamId {
2088 previous: previous.as_str().map(ToString::to_string),
2089 },
2090 "assigneeId" => LinearIssueChange::AssigneeId {
2091 previous: previous.as_str().map(ToString::to_string),
2092 },
2093 "projectId" => LinearIssueChange::ProjectId {
2094 previous: previous.as_str().map(ToString::to_string),
2095 },
2096 "cycleId" => LinearIssueChange::CycleId {
2097 previous: previous.as_str().map(ToString::to_string),
2098 },
2099 "dueDate" => LinearIssueChange::DueDate {
2100 previous: previous.as_str().map(ToString::to_string),
2101 },
2102 "parentId" => LinearIssueChange::ParentId {
2103 previous: previous.as_str().map(ToString::to_string),
2104 },
2105 "sortOrder" => LinearIssueChange::SortOrder {
2106 previous: previous.as_f64(),
2107 },
2108 "labelIds" => LinearIssueChange::LabelIds {
2109 previous: parse_string_array(previous),
2110 },
2111 "completedAt" => LinearIssueChange::CompletedAt {
2112 previous: previous.as_str().map(ToString::to_string),
2113 },
2114 _ => LinearIssueChange::Other {
2115 field: field.clone(),
2116 previous: previous.clone(),
2117 },
2118 };
2119 changes.push(change);
2120 }
2121 changes
2122}
2123
2124fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
2125 value
2126 .as_i64()
2127 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
2128 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
2129}
2130
2131fn parse_string_array(value: &JsonValue) -> Vec<String> {
2132 let Some(array) = value.as_array() else {
2133 return Vec::new();
2134 };
2135 array
2136 .iter()
2137 .filter_map(|entry| {
2138 entry.as_str().map(ToString::to_string).or_else(|| {
2139 entry
2140 .get("id")
2141 .and_then(JsonValue::as_str)
2142 .map(ToString::to_string)
2143 })
2144 })
2145 .collect()
2146}
2147
2148fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
2149 headers
2150 .iter()
2151 .find(|(key, _)| key.eq_ignore_ascii_case(name))
2152 .map(|(_, value)| value.as_str())
2153}
2154
2155fn notion_payload(
2156 kind: &str,
2157 headers: &BTreeMap<String, String>,
2158 raw: JsonValue,
2159) -> ProviderPayload {
2160 let workspace_id = raw
2161 .get("workspace_id")
2162 .and_then(JsonValue::as_str)
2163 .map(ToString::to_string);
2164 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
2165 event: kind.to_string(),
2166 workspace_id,
2167 request_id: headers
2168 .get("request-id")
2169 .cloned()
2170 .or_else(|| headers.get("x-request-id").cloned()),
2171 subscription_id: raw
2172 .get("subscription_id")
2173 .and_then(JsonValue::as_str)
2174 .map(ToString::to_string),
2175 integration_id: raw
2176 .get("integration_id")
2177 .and_then(JsonValue::as_str)
2178 .map(ToString::to_string),
2179 attempt_number: raw
2180 .get("attempt_number")
2181 .and_then(JsonValue::as_u64)
2182 .and_then(|value| u32::try_from(value).ok()),
2183 entity_id: raw
2184 .get("entity")
2185 .and_then(|value| value.get("id"))
2186 .and_then(JsonValue::as_str)
2187 .map(ToString::to_string),
2188 entity_type: raw
2189 .get("entity")
2190 .and_then(|value| value.get("type"))
2191 .and_then(JsonValue::as_str)
2192 .map(ToString::to_string),
2193 api_version: raw
2194 .get("api_version")
2195 .and_then(JsonValue::as_str)
2196 .map(ToString::to_string),
2197 verification_token: raw
2198 .get("verification_token")
2199 .and_then(JsonValue::as_str)
2200 .map(ToString::to_string),
2201 polled: None,
2202 raw,
2203 })))
2204}
2205
2206fn cron_payload(
2207 _kind: &str,
2208 _headers: &BTreeMap<String, String>,
2209 raw: JsonValue,
2210) -> ProviderPayload {
2211 let cron_id = raw
2212 .get("cron_id")
2213 .and_then(JsonValue::as_str)
2214 .map(ToString::to_string);
2215 let schedule = raw
2216 .get("schedule")
2217 .and_then(JsonValue::as_str)
2218 .map(ToString::to_string);
2219 let tick_at = raw
2220 .get("tick_at")
2221 .and_then(JsonValue::as_str)
2222 .and_then(parse_rfc3339)
2223 .unwrap_or_else(OffsetDateTime::now_utc);
2224 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
2225 cron_id,
2226 schedule,
2227 tick_at,
2228 raw,
2229 }))
2230}
2231
2232fn webhook_payload(
2233 _kind: &str,
2234 headers: &BTreeMap<String, String>,
2235 raw: JsonValue,
2236) -> ProviderPayload {
2237 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
2238 source: headers.get("X-Webhook-Source").cloned(),
2239 content_type: headers.get("Content-Type").cloned(),
2240 raw,
2241 }))
2242}
2243
2244fn a2a_push_payload(
2245 _kind: &str,
2246 _headers: &BTreeMap<String, String>,
2247 raw: JsonValue,
2248) -> ProviderPayload {
2249 let task_id = raw
2250 .get("task_id")
2251 .and_then(JsonValue::as_str)
2252 .map(ToString::to_string);
2253 let sender = raw
2254 .get("sender")
2255 .and_then(JsonValue::as_str)
2256 .map(ToString::to_string);
2257 let task_state = raw
2258 .pointer("/status/state")
2259 .or_else(|| raw.pointer("/statusUpdate/status/state"))
2260 .and_then(JsonValue::as_str)
2261 .map(|state| match state {
2262 "cancelled" => "canceled".to_string(),
2263 other => other.to_string(),
2264 });
2265 let artifact = raw
2266 .pointer("/artifactUpdate/artifact")
2267 .or_else(|| raw.get("artifact"))
2268 .cloned();
2269 let kind = task_state
2270 .as_deref()
2271 .map(|state| format!("a2a.task.{state}"))
2272 .unwrap_or_else(|| "a2a.task.update".to_string());
2273 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
2274 task_id,
2275 task_state,
2276 artifact,
2277 sender,
2278 raw,
2279 kind,
2280 }))
2281}
2282
2283fn kafka_payload(
2284 kind: &str,
2285 headers: &BTreeMap<String, String>,
2286 raw: JsonValue,
2287) -> ProviderPayload {
2288 ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
2289 kind, headers, raw,
2290 )))
2291}
2292
2293fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
2294 ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
2295 kind, headers, raw,
2296 )))
2297}
2298
2299fn pulsar_payload(
2300 kind: &str,
2301 headers: &BTreeMap<String, String>,
2302 raw: JsonValue,
2303) -> ProviderPayload {
2304 ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
2305 kind, headers, raw,
2306 )))
2307}
2308
2309fn postgres_cdc_payload(
2310 kind: &str,
2311 headers: &BTreeMap<String, String>,
2312 raw: JsonValue,
2313) -> ProviderPayload {
2314 ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
2315 kind, headers, raw,
2316 )))
2317}
2318
2319fn email_payload(
2320 kind: &str,
2321 headers: &BTreeMap<String, String>,
2322 raw: JsonValue,
2323) -> ProviderPayload {
2324 ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
2325 kind, headers, raw,
2326 )))
2327}
2328
2329fn websocket_payload(
2330 kind: &str,
2331 headers: &BTreeMap<String, String>,
2332 raw: JsonValue,
2333) -> ProviderPayload {
2334 ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
2335 kind, headers, raw,
2336 )))
2337}
2338
2339fn stream_payload(
2340 kind: &str,
2341 headers: &BTreeMap<String, String>,
2342 raw: JsonValue,
2343) -> StreamEventPayload {
2344 StreamEventPayload {
2345 event: kind.to_string(),
2346 source: json_stringish(&raw, &["source", "connector", "origin"]),
2347 stream: json_stringish(
2348 &raw,
2349 &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2350 ),
2351 partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2352 offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2353 key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2354 timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2355 headers: headers.clone(),
2356 raw,
2357 }
2358}
2359
2360fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2361 fields.iter().find_map(|field| {
2362 let value = raw.get(*field)?;
2363 value
2364 .as_str()
2365 .map(ToString::to_string)
2366 .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2367 .or_else(|| value.as_u64().map(|number| number.to_string()))
2368 })
2369}
2370
2371fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2372 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2373}
2374
2375#[cfg(test)]
2376mod tests {
2377 use super::*;
2378
2379 struct OwnedProviderSchema {
2380 metadata: ProviderMetadata,
2381 }
2382
2383 impl OwnedProviderSchema {
2384 fn new(provider: &str, schema_name: &str) -> Self {
2385 Self {
2386 metadata: ProviderMetadata {
2387 provider: provider.to_string(),
2388 kinds: vec!["webhook".to_string()],
2389 schema_name: schema_name.to_string(),
2390 runtime: ProviderRuntimeMetadata::Placeholder,
2391 ..ProviderMetadata::default()
2392 },
2393 }
2394 }
2395 }
2396
2397 impl ProviderSchema for OwnedProviderSchema {
2398 fn provider_id(&self) -> &str {
2399 &self.metadata.provider
2400 }
2401
2402 fn harn_schema_name(&self) -> &str {
2403 &self.metadata.schema_name
2404 }
2405
2406 fn metadata(&self) -> ProviderMetadata {
2407 self.metadata.clone()
2408 }
2409
2410 fn normalize(
2411 &self,
2412 _kind: &str,
2413 _headers: &BTreeMap<String, String>,
2414 raw: JsonValue,
2415 ) -> Result<ProviderPayload, ProviderCatalogError> {
2416 Ok(ProviderPayload::Extension(ExtensionProviderPayload {
2417 provider: self.metadata.provider.clone(),
2418 schema_name: self.metadata.schema_name.clone(),
2419 raw,
2420 }))
2421 }
2422 }
2423
2424 fn owned_provider_schema(provider: &str, schema_name: &str) -> Arc<dyn ProviderSchema> {
2425 Arc::new(OwnedProviderSchema::new(provider, schema_name))
2426 }
2427
2428 fn sample_headers() -> BTreeMap<String, String> {
2429 BTreeMap::from([
2430 ("Authorization".to_string(), "Bearer secret".to_string()),
2431 ("Cookie".to_string(), "session=abc".to_string()),
2432 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2433 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2434 ("X-GitHub-Event".to_string(), "issues".to_string()),
2435 ("X-Webhook-Token".to_string(), "token".to_string()),
2436 ])
2437 }
2438
2439 #[test]
2440 fn default_redaction_policy_keeps_safe_headers() {
2441 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2442 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2443 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2444 assert_eq!(
2445 redacted.get("Authorization").unwrap(),
2446 REDACTED_HEADER_VALUE
2447 );
2448 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2449 assert_eq!(
2450 redacted.get("X-Webhook-Token").unwrap(),
2451 REDACTED_HEADER_VALUE
2452 );
2453 }
2454
2455 #[test]
2456 fn provider_catalog_rejects_duplicates() {
2457 let mut catalog = ProviderCatalog::default();
2458 catalog
2459 .register(Arc::new(BuiltinProviderSchema {
2460 provider_id: "github",
2461 harn_schema_name: "GitHubEventPayload",
2462 metadata: provider_metadata_entry(
2463 "github",
2464 &["webhook"],
2465 "GitHubEventPayload",
2466 &[],
2467 SignatureVerificationMetadata::None,
2468 Vec::new(),
2469 ProviderRuntimeMetadata::Placeholder,
2470 ),
2471 normalize: github_payload,
2472 }))
2473 .unwrap();
2474 let error = catalog
2475 .register(Arc::new(BuiltinProviderSchema {
2476 provider_id: "github",
2477 harn_schema_name: "GitHubEventPayload",
2478 metadata: provider_metadata_entry(
2479 "github",
2480 &["webhook"],
2481 "GitHubEventPayload",
2482 &[],
2483 SignatureVerificationMetadata::None,
2484 Vec::new(),
2485 ProviderRuntimeMetadata::Placeholder,
2486 ),
2487 normalize: github_payload,
2488 }))
2489 .unwrap_err();
2490 assert_eq!(
2491 error,
2492 ProviderCatalogError::DuplicateProvider("github".to_string())
2493 );
2494 }
2495
2496 #[test]
2497 fn provider_catalog_builds_independent_owned_dynamic_catalogs() {
2498 let first = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2499 "runtime-a",
2500 "RuntimeAPayload",
2501 )])
2502 .unwrap();
2503 assert_eq!(
2504 first
2505 .metadata_for("runtime-a")
2506 .expect("first dynamic provider")
2507 .schema_name,
2508 "RuntimeAPayload"
2509 );
2510 assert!(first.metadata_for("runtime-b").is_none());
2511
2512 let second = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2513 "runtime-b",
2514 "RuntimeBPayload",
2515 )])
2516 .unwrap();
2517 assert!(second.metadata_for("runtime-a").is_none());
2518 assert_eq!(
2519 second
2520 .metadata_for("runtime-b")
2521 .expect("second dynamic provider")
2522 .schema_name,
2523 "RuntimeBPayload"
2524 );
2525 assert!(first.metadata_for("runtime-a").is_some());
2526 }
2527
2528 #[test]
2529 fn registered_provider_metadata_marks_builtin_connectors() {
2530 let entries = registered_provider_metadata();
2531 let builtin: Vec<&ProviderMetadata> = entries
2532 .iter()
2533 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2534 .collect();
2535
2536 assert_eq!(builtin.len(), 9);
2537 assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2538 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2539 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2540 for provider in ["github", "linear", "notion", "slack"] {
2541 let entry = entries
2542 .iter()
2543 .find(|entry| entry.provider == provider)
2544 .expect("first-party package-backed provider metadata");
2545 assert!(matches!(
2546 entry.runtime,
2547 ProviderRuntimeMetadata::Placeholder
2548 ));
2549 }
2550 let kafka = entries
2551 .iter()
2552 .find(|entry| entry.provider == "kafka")
2553 .expect("kafka stream provider");
2554 assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2555 assert_eq!(kafka.schema_name, "StreamEventPayload");
2556 assert!(matches!(
2557 kafka.runtime,
2558 ProviderRuntimeMetadata::Builtin {
2559 ref connector,
2560 default_signature_variant: None
2561 } if connector == "stream"
2562 ));
2563 }
2564
2565 #[test]
2566 fn trigger_event_round_trip_is_stable() {
2567 let provider = ProviderId::from("github");
2568 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2569 let payload = ProviderPayload::normalize(
2570 &provider,
2571 "issues",
2572 &sample_headers(),
2573 serde_json::json!({
2574 "action": "opened",
2575 "installation": {"id": 42},
2576 "issue": {"number": 99}
2577 }),
2578 )
2579 .unwrap();
2580 let event = TriggerEvent {
2581 id: TriggerEventId("trigger_evt_fixed".to_string()),
2582 provider,
2583 kind: "issues".to_string(),
2584 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2585 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2586 dedupe_key: "delivery-123".to_string(),
2587 trace_id: TraceId("trace_fixed".to_string()),
2588 tenant_id: Some(TenantId("tenant_1".to_string())),
2589 headers,
2590 provider_payload: payload,
2591 signature_status: SignatureStatus::Verified,
2592 dedupe_claimed: false,
2593 batch: None,
2594 raw_body: Some(vec![0, 159, 255, 10]),
2595 };
2596
2597 let once = serde_json::to_value(&event).unwrap();
2598 assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2599 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2600 let twice = serde_json::to_value(&decoded).unwrap();
2601 assert_eq!(decoded, event);
2602 assert_eq!(once, twice);
2603 }
2604
2605 #[test]
2606 fn unknown_provider_errors() {
2607 let error = ProviderPayload::normalize(
2608 &ProviderId::from("custom-provider"),
2609 "thing.happened",
2610 &BTreeMap::new(),
2611 serde_json::json!({"ok": true}),
2612 )
2613 .unwrap_err();
2614 assert_eq!(
2615 error,
2616 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2617 );
2618 }
2619
2620 fn github_headers(event: &str, delivery: &str) -> BTreeMap<String, String> {
2621 BTreeMap::from([
2622 ("X-GitHub-Event".to_string(), event.to_string()),
2623 ("X-GitHub-Delivery".to_string(), delivery.to_string()),
2624 ])
2625 }
2626
2627 fn unwrap_github(payload: ProviderPayload) -> GitHubEventPayload {
2628 match payload {
2629 ProviderPayload::Known(KnownProviderPayload::GitHub(p)) => p,
2630 other => panic!("expected GitHub payload, got {other:?}"),
2631 }
2632 }
2633
2634 fn connector_normalized(
2638 event: &str,
2639 delivery: &str,
2640 installation_id: i64,
2641 action: Option<&str>,
2642 original: serde_json::Value,
2643 promoted: serde_json::Value,
2644 ) -> serde_json::Value {
2645 let mut common = serde_json::json!({
2646 "provider": "github",
2647 "event": event,
2648 "topic": match action {
2649 Some(a) => format!("github.{event}.{a}"),
2650 None => format!("github.{event}"),
2651 },
2652 "delivery_id": delivery,
2653 "installation_id": installation_id,
2654 "repository": original.get("repository").cloned().unwrap_or(JsonValue::Null),
2655 "repo": serde_json::json!({"owner": "octo-org", "name": "octo-repo", "full_name": "octo-org/octo-repo"}),
2656 "raw": original,
2657 });
2658 if let Some(a) = action {
2659 common["action"] = serde_json::json!(a);
2660 }
2661 let common_obj = common.as_object_mut().unwrap();
2662 if let Some(promoted_obj) = promoted.as_object() {
2663 for (k, v) in promoted_obj {
2664 common_obj.insert(k.clone(), v.clone());
2665 }
2666 }
2667 common
2668 }
2669
2670 #[test]
2671 fn github_check_suite_event_promotes_typed_fields() {
2672 let original = serde_json::json!({
2673 "action": "requested",
2674 "check_suite": {
2675 "id": 8101,
2676 "status": "queued",
2677 "conclusion": null,
2678 "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2679 "head_branch": "feature/x",
2680 },
2681 "repository": {"full_name": "octo-org/octo-repo"},
2682 "installation": {"id": 3001},
2683 });
2684 let normalized = connector_normalized(
2685 "check_suite",
2686 "delivery-cs",
2687 3001,
2688 Some("requested"),
2689 original.clone(),
2690 serde_json::json!({
2691 "check_suite": original["check_suite"].clone(),
2692 "check_suite_id": 8101,
2693 "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2694 "head_ref": "feature/x",
2695 "status": "queued",
2696 }),
2697 );
2698 let provider = ProviderId::from("github");
2699 let payload = ProviderPayload::normalize(
2700 &provider,
2701 "check_suite",
2702 &github_headers("check_suite", "delivery-cs"),
2703 normalized,
2704 )
2705 .expect("check_suite payload");
2706 let GitHubEventPayload::CheckSuite(check_suite) = unwrap_github(payload) else {
2707 panic!("expected CheckSuite variant");
2708 };
2709 assert_eq!(check_suite.common.event, "check_suite");
2710 assert_eq!(check_suite.common.action.as_deref(), Some("requested"));
2711 assert_eq!(
2712 check_suite.common.delivery_id.as_deref(),
2713 Some("delivery-cs")
2714 );
2715 assert_eq!(check_suite.common.installation_id, Some(3001));
2716 assert_eq!(
2717 check_suite.common.topic.as_deref(),
2718 Some("github.check_suite.requested")
2719 );
2720 assert!(check_suite.common.repository.is_some());
2721 assert!(check_suite.common.repo.is_some());
2722 assert_eq!(check_suite.check_suite_id, Some(8101));
2723 assert_eq!(
2724 check_suite.head_sha.as_deref(),
2725 Some("ccccccccccccccccccccccccccccccccccccccc1")
2726 );
2727 assert_eq!(check_suite.head_ref.as_deref(), Some("feature/x"));
2728 assert_eq!(check_suite.status.as_deref(), Some("queued"));
2729 assert_eq!(check_suite.common.raw, original);
2731 }
2732
2733 #[test]
2734 fn github_status_event_promotes_typed_fields() {
2735 let original = serde_json::json!({
2736 "id": 9101,
2737 "sha": "ccccccccccccccccccccccccccccccccccccccc1",
2738 "state": "success",
2739 "context": "legacy/status",
2740 "target_url": "https://ci.example.test/octo-repo/9101",
2741 "branches": [{"name": "main"}],
2742 "repository": {"full_name": "octo-org/octo-repo"},
2743 "installation": {"id": 3001},
2744 });
2745 let normalized = connector_normalized(
2746 "status",
2747 "delivery-status",
2748 3001,
2749 None,
2750 original.clone(),
2751 serde_json::json!({
2752 "commit_status": original.clone(),
2753 "status_id": 9101,
2754 "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2755 "head_ref": "main",
2756 "base_ref": "main",
2757 "state": "success",
2758 "context": "legacy/status",
2759 "target_url": "https://ci.example.test/octo-repo/9101",
2760 }),
2761 );
2762 let provider = ProviderId::from("github");
2763 let payload = ProviderPayload::normalize(
2764 &provider,
2765 "status",
2766 &github_headers("status", "delivery-status"),
2767 normalized,
2768 )
2769 .expect("status payload");
2770 let GitHubEventPayload::Status(status) = unwrap_github(payload) else {
2771 panic!("expected Status variant");
2772 };
2773 assert_eq!(status.common.event, "status");
2774 assert_eq!(status.common.installation_id, Some(3001));
2775 assert_eq!(status.status_id, Some(9101));
2776 assert_eq!(status.state.as_deref(), Some("success"));
2777 assert_eq!(status.context.as_deref(), Some("legacy/status"));
2778 assert_eq!(
2779 status.target_url.as_deref(),
2780 Some("https://ci.example.test/octo-repo/9101")
2781 );
2782 assert_eq!(
2783 status.head_sha.as_deref(),
2784 Some("ccccccccccccccccccccccccccccccccccccccc1")
2785 );
2786 assert!(status.commit_status.is_some());
2787 }
2788
2789 #[test]
2790 fn github_merge_group_event_promotes_typed_fields() {
2791 let original = serde_json::json!({
2792 "action": "checks_requested",
2793 "merge_group": {
2794 "id": 9201,
2795 "head_ref": "gh-readonly-queue/main/pr-42",
2796 "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2797 "base_ref": "main",
2798 "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2799 "pull_requests": [{"number": 42}, {"number": 43}],
2800 },
2801 "repository": {"full_name": "octo-org/octo-repo"},
2802 "installation": {"id": 3001},
2803 });
2804 let normalized = connector_normalized(
2805 "merge_group",
2806 "delivery-mg",
2807 3001,
2808 Some("checks_requested"),
2809 original.clone(),
2810 serde_json::json!({
2811 "merge_group": original["merge_group"].clone(),
2812 "merge_group_id": 9201,
2813 "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2814 "head_ref": "gh-readonly-queue/main/pr-42",
2815 "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2816 "base_ref": "main",
2817 "pull_requests": [{"number": 42}, {"number": 43}],
2818 "pull_request_numbers": [42, 43],
2819 }),
2820 );
2821 let provider = ProviderId::from("github");
2822 let payload = ProviderPayload::normalize(
2823 &provider,
2824 "merge_group",
2825 &github_headers("merge_group", "delivery-mg"),
2826 normalized,
2827 )
2828 .expect("merge_group payload");
2829 let GitHubEventPayload::MergeGroup(mg) = unwrap_github(payload) else {
2830 panic!("expected MergeGroup variant");
2831 };
2832 assert_eq!(mg.common.event, "merge_group");
2833 assert_eq!(mg.common.action.as_deref(), Some("checks_requested"));
2834 assert_eq!(mg.merge_group_id, Some(serde_json::json!(9201)));
2835 assert_eq!(mg.head_ref.as_deref(), Some("gh-readonly-queue/main/pr-42"));
2836 assert_eq!(
2837 mg.base_sha.as_deref(),
2838 Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1")
2839 );
2840 assert_eq!(mg.base_ref.as_deref(), Some("main"));
2841 assert_eq!(mg.pull_request_numbers, vec![42i64, 43i64]);
2842 assert_eq!(mg.pull_requests.len(), 2);
2843 }
2844
2845 #[test]
2846 fn github_installation_event_promotes_typed_fields() {
2847 let original = serde_json::json!({
2848 "action": "suspend",
2849 "installation": {
2850 "id": 3001,
2851 "account": {"login": "octo-org"},
2852 "repository_selection": "selected",
2853 "suspended_at": "2026-04-20T18:00:00Z",
2854 },
2855 "repositories": [{"full_name": "octo-org/octo-repo"}],
2856 });
2857 let normalized = connector_normalized(
2858 "installation",
2859 "delivery-inst",
2860 3001,
2861 Some("suspend"),
2862 original.clone(),
2863 serde_json::json!({
2864 "installation": original["installation"].clone(),
2865 "account": {"login": "octo-org"},
2866 "installation_state": "suspended",
2867 "suspended": true,
2868 "revoked": false,
2869 "repositories": original["repositories"].clone(),
2870 }),
2871 );
2872 let provider = ProviderId::from("github");
2873 let payload = ProviderPayload::normalize(
2874 &provider,
2875 "installation",
2876 &github_headers("installation", "delivery-inst"),
2877 normalized,
2878 )
2879 .expect("installation payload");
2880 let GitHubEventPayload::Installation(inst) = unwrap_github(payload) else {
2881 panic!("expected Installation variant");
2882 };
2883 assert_eq!(inst.common.event, "installation");
2884 assert_eq!(inst.common.action.as_deref(), Some("suspend"));
2885 assert_eq!(inst.installation_state.as_deref(), Some("suspended"));
2886 assert_eq!(inst.suspended, Some(true));
2887 assert_eq!(inst.revoked, Some(false));
2888 assert_eq!(inst.repositories.len(), 1);
2889 assert!(inst.account.is_some());
2890 }
2891
2892 #[test]
2893 fn github_installation_repositories_event_promotes_typed_fields() {
2894 let original = serde_json::json!({
2895 "action": "removed",
2896 "installation": {"id": 3001, "account": {"login": "octo-org"}},
2897 "repository_selection": "selected",
2898 "repositories_added": [],
2899 "repositories_removed": [
2900 {"id": 4001, "full_name": "octo-org/octo-repo"},
2901 ],
2902 });
2903 let normalized = connector_normalized(
2904 "installation_repositories",
2905 "delivery-inst-repos",
2906 3001,
2907 Some("removed"),
2908 original.clone(),
2909 serde_json::json!({
2910 "installation": original["installation"].clone(),
2911 "account": {"login": "octo-org"},
2912 "installation_state": "revoked",
2913 "suspended": false,
2914 "revoked": true,
2915 "repository_selection": "selected",
2916 "repositories_added": [],
2917 "repositories_removed": original["repositories_removed"].clone(),
2918 }),
2919 );
2920 let provider = ProviderId::from("github");
2921 let payload = ProviderPayload::normalize(
2922 &provider,
2923 "installation_repositories",
2924 &github_headers("installation_repositories", "delivery-inst-repos"),
2925 normalized,
2926 )
2927 .expect("installation_repositories payload");
2928 let GitHubEventPayload::InstallationRepositories(repos) = unwrap_github(payload) else {
2929 panic!("expected InstallationRepositories variant");
2930 };
2931 assert_eq!(repos.common.event, "installation_repositories");
2932 assert_eq!(repos.common.action.as_deref(), Some("removed"));
2933 assert_eq!(repos.repository_selection.as_deref(), Some("selected"));
2934 assert!(repos.repositories_added.is_empty());
2935 assert_eq!(repos.repositories_removed.len(), 1);
2936 assert_eq!(
2937 repos.repositories_removed[0]
2938 .get("full_name")
2939 .and_then(|v| v.as_str()),
2940 Some("octo-org/octo-repo"),
2941 );
2942 assert_eq!(repos.installation_state.as_deref(), Some("revoked"));
2943 assert_eq!(repos.revoked, Some(true));
2944 }
2945
2946 #[test]
2947 fn github_legacy_direct_webhook_still_normalizes() {
2948 let provider = ProviderId::from("github");
2952 let payload = ProviderPayload::normalize(
2953 &provider,
2954 "issues",
2955 &github_headers("issues", "delivery-legacy"),
2956 serde_json::json!({
2957 "action": "opened",
2958 "installation": {"id": 99},
2959 "issue": {"number": 7},
2960 }),
2961 )
2962 .expect("legacy issues payload");
2963 let GitHubEventPayload::Issues(issues) = unwrap_github(payload) else {
2964 panic!("expected Issues variant");
2965 };
2966 assert_eq!(issues.common.installation_id, Some(99));
2967 assert_eq!(
2968 issues.common.delivery_id.as_deref(),
2969 Some("delivery-legacy")
2970 );
2971 assert!(issues.common.topic.is_none());
2972 assert!(issues.common.repo.is_none());
2973 assert_eq!(issues.issue.get("number").and_then(|v| v.as_i64()), Some(7));
2974 }
2975
2976 #[test]
2977 fn github_new_event_variants_round_trip_through_serde() {
2978 let provider = ProviderId::from("github");
2983 let cases: &[(&str, serde_json::Value, &str)] = &[
2984 (
2985 "check_suite",
2986 serde_json::json!({
2987 "event": "check_suite",
2988 "check_suite": {"id": 1},
2989 "check_suite_id": 1,
2990 "raw": {"check_suite": {"id": 1}},
2991 }),
2992 "CheckSuite",
2993 ),
2994 (
2995 "status",
2996 serde_json::json!({
2997 "event": "status",
2998 "commit_status": {"id": 9},
2999 "status_id": 9,
3000 "state": "success",
3001 "raw": {"id": 9, "state": "success"},
3002 }),
3003 "Status",
3004 ),
3005 (
3006 "merge_group",
3007 serde_json::json!({
3008 "event": "merge_group",
3009 "merge_group": {"id": 1},
3010 "merge_group_id": 1,
3011 "raw": {"merge_group": {"id": 1}},
3012 }),
3013 "MergeGroup",
3014 ),
3015 (
3016 "installation",
3017 serde_json::json!({
3018 "event": "installation",
3019 "installation": {"id": 1},
3020 "installation_state": "active",
3021 "suspended": false,
3022 "raw": {"installation": {"id": 1}},
3023 }),
3024 "Installation",
3025 ),
3026 (
3027 "installation_repositories",
3028 serde_json::json!({
3029 "event": "installation_repositories",
3030 "installation": {"id": 1},
3031 "repository_selection": "selected",
3032 "repositories_added": [],
3033 "repositories_removed": [{"id": 7}],
3034 "raw": {"installation": {"id": 1}},
3035 }),
3036 "InstallationRepositories",
3037 ),
3038 ];
3039 for (kind, raw, want_variant) in cases {
3040 let payload = ProviderPayload::normalize(
3041 &provider,
3042 kind,
3043 &github_headers(kind, "delivery"),
3044 raw.clone(),
3045 )
3046 .unwrap_or_else(|_| panic!("normalize {kind}"));
3047 let serialized = serde_json::to_value(&payload).expect("serialize");
3048 let deserialized: ProviderPayload =
3049 serde_json::from_value(serialized.clone()).expect("deserialize");
3050 let actual_variant = match unwrap_github(deserialized) {
3051 GitHubEventPayload::Issues(_) => "Issues",
3052 GitHubEventPayload::PullRequest(_) => "PullRequest",
3053 GitHubEventPayload::IssueComment(_) => "IssueComment",
3054 GitHubEventPayload::PullRequestReview(_) => "PullRequestReview",
3055 GitHubEventPayload::Push(_) => "Push",
3056 GitHubEventPayload::WorkflowRun(_) => "WorkflowRun",
3057 GitHubEventPayload::DeploymentStatus(_) => "DeploymentStatus",
3058 GitHubEventPayload::CheckRun(_) => "CheckRun",
3059 GitHubEventPayload::CheckSuite(_) => "CheckSuite",
3060 GitHubEventPayload::Status(_) => "Status",
3061 GitHubEventPayload::MergeGroup(_) => "MergeGroup",
3062 GitHubEventPayload::Installation(_) => "Installation",
3063 GitHubEventPayload::InstallationRepositories(_) => "InstallationRepositories",
3064 GitHubEventPayload::Other(_) => "Other",
3065 };
3066 assert_eq!(
3067 actual_variant, *want_variant,
3068 "{kind} round-tripped as {actual_variant}, expected {want_variant}; serialized form: {serialized}"
3069 );
3070 }
3071 }
3072
3073 #[test]
3074 fn provider_normalizes_stream_payloads() {
3075 let payload = ProviderPayload::normalize(
3076 &ProviderId::from("kafka"),
3077 "quote.tick",
3078 &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
3079 serde_json::json!({
3080 "topic": "quotes",
3081 "partition": 7,
3082 "offset": "42",
3083 "key": "AAPL",
3084 "timestamp": "2026-04-21T12:00:00Z"
3085 }),
3086 )
3087 .expect("stream payload");
3088 let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
3089 panic!("expected kafka stream payload")
3090 };
3091 assert_eq!(payload.event, "quote.tick");
3092 assert_eq!(payload.stream.as_deref(), Some("quotes"));
3093 assert_eq!(payload.partition.as_deref(), Some("7"));
3094 assert_eq!(payload.offset.as_deref(), Some("42"));
3095 assert_eq!(payload.key.as_deref(), Some("AAPL"));
3096 assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
3097 }
3098}