1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9#[cfg(test)]
10use crate::redact::REDACTED_HEADER_VALUE;
11use crate::triggers::test_util::clock;
12
13fn serialize_optional_bytes_b64<S>(
14 value: &Option<Vec<u8>>,
15 serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18 S: Serializer,
19{
20 use base64::Engine;
21
22 match value {
23 Some(bytes) => {
24 serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25 }
26 None => serializer.serialize_none(),
27 }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32 D: Deserializer<'de>,
33{
34 use base64::Engine;
35
36 let encoded = Option::<String>::deserialize(deserializer)?;
37 encoded
38 .map(|text| {
39 base64::engine::general_purpose::STANDARD
40 .decode(text.as_bytes())
41 .map_err(serde::de::Error::custom)
42 })
43 .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51 pub fn new() -> Self {
52 Self(format!("trigger_evt_{}", Uuid::now_v7()))
53 }
54}
55
56impl Default for TriggerEventId {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67 pub fn new(value: impl Into<String>) -> Self {
68 Self(value.into())
69 }
70
71 pub fn as_str(&self) -> &str {
72 self.0.as_str()
73 }
74}
75
76impl From<&str> for ProviderId {
77 fn from(value: &str) -> Self {
78 Self::new(value)
79 }
80}
81
82impl From<String> for ProviderId {
83 fn from(value: String) -> Self {
84 Self::new(value)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93 pub fn new() -> Self {
94 Self(format!("trace_{}", Uuid::now_v7()))
95 }
96}
97
98impl Default for TraceId {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109 pub fn new(value: impl Into<String>) -> Self {
110 Self(value.into())
111 }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117 Verified,
118 Unsigned,
119 Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124 pub event: String,
125 pub action: Option<String>,
126 pub delivery_id: Option<String>,
127 pub installation_id: Option<i64>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub topic: Option<String>,
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 pub repository: Option<JsonValue>,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub repo: Option<JsonValue>,
134 pub raw: JsonValue,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubIssuesEventPayload {
139 #[serde(flatten)]
140 pub common: GitHubEventCommon,
141 pub issue: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct GitHubPullRequestEventPayload {
146 #[serde(flatten)]
147 pub common: GitHubEventCommon,
148 pub pull_request: JsonValue,
149}
150
151#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
152pub struct GitHubIssueCommentEventPayload {
153 #[serde(flatten)]
154 pub common: GitHubEventCommon,
155 pub issue: JsonValue,
156 pub comment: JsonValue,
157}
158
159#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
160pub struct GitHubPullRequestReviewEventPayload {
161 #[serde(flatten)]
162 pub common: GitHubEventCommon,
163 pub pull_request: JsonValue,
164 pub review: JsonValue,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
168pub struct GitHubPushEventPayload {
169 #[serde(flatten)]
170 pub common: GitHubEventCommon,
171 #[serde(default)]
172 pub commits: Vec<JsonValue>,
173 pub distinct_size: Option<i64>,
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177pub struct GitHubWorkflowRunEventPayload {
178 #[serde(flatten)]
179 pub common: GitHubEventCommon,
180 pub workflow_run: JsonValue,
181}
182
183#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
184pub struct GitHubDeploymentStatusEventPayload {
185 #[serde(flatten)]
186 pub common: GitHubEventCommon,
187 pub deployment_status: JsonValue,
188 pub deployment: JsonValue,
189}
190
191#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
192pub struct GitHubCheckRunEventPayload {
193 #[serde(flatten)]
194 pub common: GitHubEventCommon,
195 pub check_run: JsonValue,
196}
197
198#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
199pub struct GitHubCheckSuiteEventPayload {
200 #[serde(flatten)]
201 pub common: GitHubEventCommon,
202 pub check_suite: JsonValue,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub check_suite_id: Option<i64>,
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub pull_request_number: Option<i64>,
207 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub head_sha: Option<String>,
209 #[serde(default, skip_serializing_if = "Option::is_none")]
210 pub head_ref: Option<String>,
211 #[serde(default, skip_serializing_if = "Option::is_none")]
212 pub base_ref: Option<String>,
213 #[serde(default, skip_serializing_if = "Option::is_none")]
214 pub status: Option<String>,
215 #[serde(default, skip_serializing_if = "Option::is_none")]
216 pub conclusion: Option<String>,
217}
218
219#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
220pub struct GitHubStatusEventPayload {
221 #[serde(flatten)]
222 pub common: GitHubEventCommon,
223 #[serde(default, skip_serializing_if = "Option::is_none")]
224 pub commit_status: Option<JsonValue>,
225 #[serde(default, skip_serializing_if = "Option::is_none")]
226 pub status_id: Option<i64>,
227 #[serde(default, skip_serializing_if = "Option::is_none")]
228 pub head_sha: Option<String>,
229 #[serde(default, skip_serializing_if = "Option::is_none")]
230 pub head_ref: Option<String>,
231 #[serde(default, skip_serializing_if = "Option::is_none")]
232 pub base_ref: Option<String>,
233 #[serde(default, skip_serializing_if = "Option::is_none")]
234 pub state: Option<String>,
235 #[serde(default, skip_serializing_if = "Option::is_none")]
236 pub context: Option<String>,
237 #[serde(default, skip_serializing_if = "Option::is_none")]
238 pub target_url: Option<String>,
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct GitHubMergeGroupEventPayload {
243 #[serde(flatten)]
244 pub common: GitHubEventCommon,
245 pub merge_group: JsonValue,
246 #[serde(default, skip_serializing_if = "Option::is_none")]
247 pub merge_group_id: Option<JsonValue>,
248 #[serde(default, skip_serializing_if = "Option::is_none")]
249 pub head_sha: Option<String>,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub head_ref: Option<String>,
252 #[serde(default, skip_serializing_if = "Option::is_none")]
253 pub base_sha: Option<String>,
254 #[serde(default, skip_serializing_if = "Option::is_none")]
255 pub base_ref: Option<String>,
256 #[serde(default)]
257 pub pull_requests: Vec<JsonValue>,
258 #[serde(default)]
259 pub pull_request_numbers: Vec<i64>,
260}
261
262#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
263pub struct GitHubInstallationEventPayload {
264 #[serde(flatten)]
265 pub common: GitHubEventCommon,
266 #[serde(default, skip_serializing_if = "Option::is_none")]
267 pub installation: Option<JsonValue>,
268 #[serde(default, skip_serializing_if = "Option::is_none")]
269 pub account: Option<JsonValue>,
270 #[serde(default, skip_serializing_if = "Option::is_none")]
271 pub installation_state: Option<String>,
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub suspended: Option<bool>,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub revoked: Option<bool>,
276 #[serde(default)]
277 pub repositories: Vec<JsonValue>,
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct GitHubInstallationRepositoriesEventPayload {
282 #[serde(flatten)]
283 pub common: GitHubEventCommon,
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 pub installation: Option<JsonValue>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 pub account: Option<JsonValue>,
288 #[serde(default, skip_serializing_if = "Option::is_none")]
289 pub installation_state: Option<String>,
290 #[serde(default, skip_serializing_if = "Option::is_none")]
291 pub suspended: Option<bool>,
292 #[serde(default, skip_serializing_if = "Option::is_none")]
293 pub revoked: Option<bool>,
294 #[serde(default, skip_serializing_if = "Option::is_none")]
295 pub repository_selection: Option<String>,
296 #[serde(default)]
297 pub repositories_added: Vec<JsonValue>,
298 #[serde(default)]
299 pub repositories_removed: Vec<JsonValue>,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize)]
303#[serde(untagged)]
304pub enum GitHubEventPayload {
305 Issues(GitHubIssuesEventPayload),
306 PullRequest(GitHubPullRequestEventPayload),
307 IssueComment(GitHubIssueCommentEventPayload),
308 PullRequestReview(GitHubPullRequestReviewEventPayload),
309 Push(GitHubPushEventPayload),
310 WorkflowRun(GitHubWorkflowRunEventPayload),
311 DeploymentStatus(GitHubDeploymentStatusEventPayload),
312 CheckRun(GitHubCheckRunEventPayload),
313 CheckSuite(GitHubCheckSuiteEventPayload),
314 Status(GitHubStatusEventPayload),
315 MergeGroup(GitHubMergeGroupEventPayload),
316 Installation(GitHubInstallationEventPayload),
317 InstallationRepositories(GitHubInstallationRepositoriesEventPayload),
318 Other(GitHubEventCommon),
319}
320
321impl<'de> Deserialize<'de> for GitHubEventPayload {
326 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
327 where
328 D: Deserializer<'de>,
329 {
330 let value = JsonValue::deserialize(deserializer)?;
331 let kind = value
332 .get("event")
333 .and_then(JsonValue::as_str)
334 .unwrap_or("")
335 .to_string();
336 let from_value = |v: JsonValue| -> Result<GitHubEventPayload, D::Error> {
337 let payload = match kind.as_str() {
338 "issues" => GitHubEventPayload::Issues(
339 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
340 ),
341 "pull_request" => GitHubEventPayload::PullRequest(
342 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
343 ),
344 "issue_comment" => GitHubEventPayload::IssueComment(
345 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
346 ),
347 "pull_request_review" => GitHubEventPayload::PullRequestReview(
348 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
349 ),
350 "push" => GitHubEventPayload::Push(
351 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
352 ),
353 "workflow_run" => GitHubEventPayload::WorkflowRun(
354 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
355 ),
356 "deployment_status" => GitHubEventPayload::DeploymentStatus(
357 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
358 ),
359 "check_run" => GitHubEventPayload::CheckRun(
360 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
361 ),
362 "check_suite" => GitHubEventPayload::CheckSuite(
363 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
364 ),
365 "status" => GitHubEventPayload::Status(
366 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
367 ),
368 "merge_group" => GitHubEventPayload::MergeGroup(
369 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
370 ),
371 "installation" => GitHubEventPayload::Installation(
372 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
373 ),
374 "installation_repositories" => GitHubEventPayload::InstallationRepositories(
375 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
376 ),
377 _ => GitHubEventPayload::Other(
378 serde_json::from_value(v).map_err(serde::de::Error::custom)?,
379 ),
380 };
381 Ok(payload)
382 };
383 from_value(value)
384 }
385}
386
387#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
388pub struct SlackEventCommon {
389 pub event: String,
390 pub event_id: Option<String>,
391 pub api_app_id: Option<String>,
392 pub team_id: Option<String>,
393 pub channel_id: Option<String>,
394 pub user_id: Option<String>,
395 pub event_ts: Option<String>,
396 pub raw: JsonValue,
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct SlackMessageEventPayload {
401 #[serde(flatten)]
402 pub common: SlackEventCommon,
403 pub subtype: Option<String>,
404 pub channel_type: Option<String>,
405 pub channel: Option<String>,
406 pub user: Option<String>,
407 pub text: Option<String>,
408 pub ts: Option<String>,
409 pub thread_ts: Option<String>,
410}
411
412#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
413pub struct SlackAppMentionEventPayload {
414 #[serde(flatten)]
415 pub common: SlackEventCommon,
416 pub channel: Option<String>,
417 pub user: Option<String>,
418 pub text: Option<String>,
419 pub ts: Option<String>,
420 pub thread_ts: Option<String>,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424pub struct SlackReactionAddedEventPayload {
425 #[serde(flatten)]
426 pub common: SlackEventCommon,
427 pub reaction: Option<String>,
428 pub item_user: Option<String>,
429 pub item: JsonValue,
430}
431
432#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
433pub struct SlackAppHomeOpenedEventPayload {
434 #[serde(flatten)]
435 pub common: SlackEventCommon,
436 pub user: Option<String>,
437 pub channel: Option<String>,
438 pub tab: Option<String>,
439 pub view: JsonValue,
440}
441
442#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
443pub struct SlackAssistantThreadStartedEventPayload {
444 #[serde(flatten)]
445 pub common: SlackEventCommon,
446 pub assistant_thread: JsonValue,
447 pub thread_ts: Option<String>,
448 pub context: JsonValue,
449}
450
451#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
452#[serde(untagged)]
453pub enum SlackEventPayload {
454 Message(SlackMessageEventPayload),
455 AppMention(SlackAppMentionEventPayload),
456 ReactionAdded(SlackReactionAddedEventPayload),
457 AppHomeOpened(SlackAppHomeOpenedEventPayload),
458 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
459 Other(SlackEventCommon),
460}
461
462#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
463pub struct LinearEventCommon {
464 pub event: String,
465 pub action: Option<String>,
466 pub delivery_id: Option<String>,
467 pub organization_id: Option<String>,
468 pub webhook_timestamp: Option<i64>,
469 pub webhook_id: Option<String>,
470 pub url: Option<String>,
471 pub created_at: Option<String>,
472 pub actor: JsonValue,
473 pub raw: JsonValue,
474}
475
476#[derive(Clone, Debug, PartialEq)]
477pub enum LinearIssueChange {
478 Title { previous: Option<String> },
479 Description { previous: Option<String> },
480 Priority { previous: Option<i64> },
481 Estimate { previous: Option<i64> },
482 StateId { previous: Option<String> },
483 TeamId { previous: Option<String> },
484 AssigneeId { previous: Option<String> },
485 ProjectId { previous: Option<String> },
486 CycleId { previous: Option<String> },
487 DueDate { previous: Option<String> },
488 ParentId { previous: Option<String> },
489 SortOrder { previous: Option<f64> },
490 LabelIds { previous: Vec<String> },
491 CompletedAt { previous: Option<String> },
492 Other { field: String, previous: JsonValue },
493}
494
495impl Serialize for LinearIssueChange {
496 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
497 where
498 S: Serializer,
499 {
500 let value = match self {
501 Self::Title { previous } => {
502 serde_json::json!({ "field_name": "title", "previous": previous })
503 }
504 Self::Description { previous } => {
505 serde_json::json!({ "field_name": "description", "previous": previous })
506 }
507 Self::Priority { previous } => {
508 serde_json::json!({ "field_name": "priority", "previous": previous })
509 }
510 Self::Estimate { previous } => {
511 serde_json::json!({ "field_name": "estimate", "previous": previous })
512 }
513 Self::StateId { previous } => {
514 serde_json::json!({ "field_name": "state_id", "previous": previous })
515 }
516 Self::TeamId { previous } => {
517 serde_json::json!({ "field_name": "team_id", "previous": previous })
518 }
519 Self::AssigneeId { previous } => {
520 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
521 }
522 Self::ProjectId { previous } => {
523 serde_json::json!({ "field_name": "project_id", "previous": previous })
524 }
525 Self::CycleId { previous } => {
526 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
527 }
528 Self::DueDate { previous } => {
529 serde_json::json!({ "field_name": "due_date", "previous": previous })
530 }
531 Self::ParentId { previous } => {
532 serde_json::json!({ "field_name": "parent_id", "previous": previous })
533 }
534 Self::SortOrder { previous } => {
535 serde_json::json!({ "field_name": "sort_order", "previous": previous })
536 }
537 Self::LabelIds { previous } => {
538 serde_json::json!({ "field_name": "label_ids", "previous": previous })
539 }
540 Self::CompletedAt { previous } => {
541 serde_json::json!({ "field_name": "completed_at", "previous": previous })
542 }
543 Self::Other { field, previous } => {
544 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
545 }
546 };
547 value.serialize(serializer)
548 }
549}
550
551impl<'de> Deserialize<'de> for LinearIssueChange {
552 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
553 where
554 D: Deserializer<'de>,
555 {
556 let value = JsonValue::deserialize(deserializer)?;
557 let field_name = value
558 .get("field_name")
559 .and_then(JsonValue::as_str)
560 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
561 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
562 Ok(match field_name {
563 "title" => Self::Title {
564 previous: previous.as_str().map(ToString::to_string),
565 },
566 "description" => Self::Description {
567 previous: previous.as_str().map(ToString::to_string),
568 },
569 "priority" => Self::Priority {
570 previous: parse_json_i64ish(&previous),
571 },
572 "estimate" => Self::Estimate {
573 previous: parse_json_i64ish(&previous),
574 },
575 "state_id" => Self::StateId {
576 previous: previous.as_str().map(ToString::to_string),
577 },
578 "team_id" => Self::TeamId {
579 previous: previous.as_str().map(ToString::to_string),
580 },
581 "assignee_id" => Self::AssigneeId {
582 previous: previous.as_str().map(ToString::to_string),
583 },
584 "project_id" => Self::ProjectId {
585 previous: previous.as_str().map(ToString::to_string),
586 },
587 "cycle_id" => Self::CycleId {
588 previous: previous.as_str().map(ToString::to_string),
589 },
590 "due_date" => Self::DueDate {
591 previous: previous.as_str().map(ToString::to_string),
592 },
593 "parent_id" => Self::ParentId {
594 previous: previous.as_str().map(ToString::to_string),
595 },
596 "sort_order" => Self::SortOrder {
597 previous: previous.as_f64(),
598 },
599 "label_ids" => Self::LabelIds {
600 previous: parse_string_array(&previous),
601 },
602 "completed_at" => Self::CompletedAt {
603 previous: previous.as_str().map(ToString::to_string),
604 },
605 "other" => Self::Other {
606 field: value
607 .get("field")
608 .and_then(JsonValue::as_str)
609 .map(ToString::to_string)
610 .unwrap_or_else(|| "unknown".to_string()),
611 previous,
612 },
613 other => Self::Other {
614 field: other.to_string(),
615 previous,
616 },
617 })
618 }
619}
620
621#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
622pub struct LinearIssueEventPayload {
623 #[serde(flatten)]
624 pub common: LinearEventCommon,
625 pub issue: JsonValue,
626 #[serde(default)]
627 pub changes: Vec<LinearIssueChange>,
628}
629
630#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
631pub struct LinearIssueCommentEventPayload {
632 #[serde(flatten)]
633 pub common: LinearEventCommon,
634 pub comment: JsonValue,
635}
636
637#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
638pub struct LinearIssueLabelEventPayload {
639 #[serde(flatten)]
640 pub common: LinearEventCommon,
641 pub label: JsonValue,
642}
643
644#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
645pub struct LinearProjectEventPayload {
646 #[serde(flatten)]
647 pub common: LinearEventCommon,
648 pub project: JsonValue,
649}
650
651#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
652pub struct LinearCycleEventPayload {
653 #[serde(flatten)]
654 pub common: LinearEventCommon,
655 pub cycle: JsonValue,
656}
657
658#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
659pub struct LinearCustomerEventPayload {
660 #[serde(flatten)]
661 pub common: LinearEventCommon,
662 pub customer: JsonValue,
663}
664
665#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
666pub struct LinearCustomerRequestEventPayload {
667 #[serde(flatten)]
668 pub common: LinearEventCommon,
669 pub customer_request: JsonValue,
670}
671
672#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
673#[serde(untagged)]
674pub enum LinearEventPayload {
675 Issue(LinearIssueEventPayload),
676 IssueComment(LinearIssueCommentEventPayload),
677 IssueLabel(LinearIssueLabelEventPayload),
678 Project(LinearProjectEventPayload),
679 Cycle(LinearCycleEventPayload),
680 Customer(LinearCustomerEventPayload),
681 CustomerRequest(LinearCustomerRequestEventPayload),
682 Other(LinearEventCommon),
683}
684
685#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
686pub struct NotionPolledChangeEvent {
687 pub resource: String,
688 pub source_id: String,
689 pub entity_id: String,
690 pub high_water_mark: String,
691 pub before: Option<JsonValue>,
692 pub after: JsonValue,
693}
694
695#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
696pub struct NotionEventPayload {
697 pub event: String,
698 pub workspace_id: Option<String>,
699 pub request_id: Option<String>,
700 pub subscription_id: Option<String>,
701 pub integration_id: Option<String>,
702 pub attempt_number: Option<u32>,
703 pub entity_id: Option<String>,
704 pub entity_type: Option<String>,
705 pub api_version: Option<String>,
706 pub verification_token: Option<String>,
707 pub polled: Option<NotionPolledChangeEvent>,
708 pub raw: JsonValue,
709}
710
711#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
712pub struct CronEventPayload {
713 pub cron_id: Option<String>,
714 pub schedule: Option<String>,
715 #[serde(with = "time::serde::rfc3339")]
716 pub tick_at: OffsetDateTime,
717 pub raw: JsonValue,
718}
719
720#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
721pub struct GenericWebhookPayload {
722 pub source: Option<String>,
723 pub content_type: Option<String>,
724 pub raw: JsonValue,
725}
726
727#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
728pub struct A2aPushPayload {
729 pub task_id: Option<String>,
730 pub task_state: Option<String>,
731 pub artifact: Option<JsonValue>,
732 pub sender: Option<String>,
733 pub raw: JsonValue,
734 pub kind: String,
735}
736
737#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
738pub struct StreamEventPayload {
739 pub event: String,
740 pub source: Option<String>,
741 pub stream: Option<String>,
742 pub partition: Option<String>,
743 pub offset: Option<String>,
744 pub key: Option<String>,
745 pub timestamp: Option<String>,
746 #[serde(default)]
747 pub headers: BTreeMap<String, String>,
748 pub raw: JsonValue,
749}
750
751#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
752pub struct ExtensionProviderPayload {
753 pub provider: String,
754 pub schema_name: String,
755 pub raw: JsonValue,
756}
757
758#[allow(clippy::large_enum_variant)]
759#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
760#[serde(untagged)]
761pub enum ProviderPayload {
762 Known(KnownProviderPayload),
763 Extension(ExtensionProviderPayload),
764}
765
766impl ProviderPayload {
767 pub fn provider(&self) -> &str {
768 match self {
769 Self::Known(known) => known.provider(),
770 Self::Extension(payload) => payload.provider.as_str(),
771 }
772 }
773
774 pub fn normalize(
775 provider: &ProviderId,
776 kind: &str,
777 headers: &BTreeMap<String, String>,
778 raw: JsonValue,
779 ) -> Result<Self, ProviderCatalogError> {
780 provider_catalog()
781 .read()
782 .expect("provider catalog poisoned")
783 .normalize(provider, kind, headers, raw)
784 }
785}
786
787#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
788#[serde(tag = "provider")]
789pub enum KnownProviderPayload {
790 #[serde(rename = "github")]
791 GitHub(GitHubEventPayload),
792 #[serde(rename = "slack")]
793 Slack(Box<SlackEventPayload>),
794 #[serde(rename = "linear")]
795 Linear(LinearEventPayload),
796 #[serde(rename = "notion")]
797 Notion(Box<NotionEventPayload>),
798 #[serde(rename = "cron")]
799 Cron(CronEventPayload),
800 #[serde(rename = "webhook")]
801 Webhook(GenericWebhookPayload),
802 #[serde(rename = "a2a-push")]
803 A2aPush(A2aPushPayload),
804 #[serde(rename = "kafka")]
805 Kafka(StreamEventPayload),
806 #[serde(rename = "nats")]
807 Nats(StreamEventPayload),
808 #[serde(rename = "pulsar")]
809 Pulsar(StreamEventPayload),
810 #[serde(rename = "postgres-cdc")]
811 PostgresCdc(StreamEventPayload),
812 #[serde(rename = "email")]
813 Email(StreamEventPayload),
814 #[serde(rename = "websocket")]
815 Websocket(StreamEventPayload),
816}
817
818impl KnownProviderPayload {
819 pub fn provider(&self) -> &str {
820 match self {
821 Self::GitHub(_) => "github",
822 Self::Slack(_) => "slack",
823 Self::Linear(_) => "linear",
824 Self::Notion(_) => "notion",
825 Self::Cron(_) => "cron",
826 Self::Webhook(_) => "webhook",
827 Self::A2aPush(_) => "a2a-push",
828 Self::Kafka(_) => "kafka",
829 Self::Nats(_) => "nats",
830 Self::Pulsar(_) => "pulsar",
831 Self::PostgresCdc(_) => "postgres-cdc",
832 Self::Email(_) => "email",
833 Self::Websocket(_) => "websocket",
834 }
835 }
836}
837
838#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
839pub struct TriggerEvent {
840 pub id: TriggerEventId,
841 pub provider: ProviderId,
842 pub kind: String,
843 #[serde(with = "time::serde::rfc3339")]
844 pub received_at: OffsetDateTime,
845 #[serde(with = "time::serde::rfc3339::option")]
846 pub occurred_at: Option<OffsetDateTime>,
847 pub dedupe_key: String,
848 pub trace_id: TraceId,
849 pub tenant_id: Option<TenantId>,
850 pub headers: BTreeMap<String, String>,
851 #[serde(default, skip_serializing_if = "Option::is_none")]
852 pub batch: Option<Vec<JsonValue>>,
853 #[serde(
854 default,
855 skip_serializing_if = "Option::is_none",
856 serialize_with = "serialize_optional_bytes_b64",
857 deserialize_with = "deserialize_optional_bytes_b64"
858 )]
859 pub raw_body: Option<Vec<u8>>,
860 pub provider_payload: ProviderPayload,
861 pub signature_status: SignatureStatus,
862 #[serde(skip)]
863 pub dedupe_claimed: bool,
864}
865
866impl TriggerEvent {
867 #[allow(clippy::too_many_arguments)]
868 pub fn new(
869 provider: ProviderId,
870 kind: impl Into<String>,
871 occurred_at: Option<OffsetDateTime>,
872 dedupe_key: impl Into<String>,
873 tenant_id: Option<TenantId>,
874 headers: BTreeMap<String, String>,
875 provider_payload: ProviderPayload,
876 signature_status: SignatureStatus,
877 ) -> Self {
878 Self {
879 id: TriggerEventId::new(),
880 provider,
881 kind: kind.into(),
882 received_at: clock::now_utc(),
883 occurred_at,
884 dedupe_key: dedupe_key.into(),
885 trace_id: TraceId::new(),
886 tenant_id,
887 headers,
888 batch: None,
889 raw_body: None,
890 provider_payload,
891 signature_status,
892 dedupe_claimed: false,
893 }
894 }
895
896 pub fn dedupe_claimed(&self) -> bool {
897 self.dedupe_claimed
898 }
899
900 pub fn mark_dedupe_claimed(&mut self) {
901 self.dedupe_claimed = true;
902 }
903}
904
905pub type HeaderRedactionPolicy = crate::redact::RedactionPolicy;
912
913pub fn redact_headers(
914 headers: &BTreeMap<String, String>,
915 policy: &HeaderRedactionPolicy,
916) -> BTreeMap<String, String> {
917 policy.redact_headers(headers)
918}
919
920#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
921pub struct ProviderSecretRequirement {
922 pub name: String,
923 pub required: bool,
924 pub namespace: String,
925}
926
927#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
928pub struct ProviderOutboundMethod {
929 pub name: String,
930}
931
932#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
933#[serde(tag = "kind", rename_all = "snake_case")]
934pub enum SignatureVerificationMetadata {
935 #[default]
936 None,
937 Hmac {
938 variant: String,
939 raw_body: bool,
940 signature_header: String,
941 timestamp_header: Option<String>,
942 id_header: Option<String>,
943 default_tolerance_secs: Option<i64>,
944 digest: String,
945 encoding: String,
946 },
947}
948
949#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
950#[serde(tag = "kind", rename_all = "snake_case")]
951pub enum ProviderRuntimeMetadata {
952 Builtin {
953 connector: String,
954 default_signature_variant: Option<String>,
955 },
956 #[default]
957 Placeholder,
958}
959
960#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
961pub struct ProviderMetadata {
962 pub provider: String,
963 #[serde(default)]
964 pub kinds: Vec<String>,
965 pub schema_name: String,
966 #[serde(default)]
967 pub outbound_methods: Vec<ProviderOutboundMethod>,
968 #[serde(default)]
969 pub secret_requirements: Vec<ProviderSecretRequirement>,
970 #[serde(default)]
971 pub signature_verification: SignatureVerificationMetadata,
972 #[serde(default)]
973 pub runtime: ProviderRuntimeMetadata,
974}
975
976impl ProviderMetadata {
977 pub fn supports_kind(&self, kind: &str) -> bool {
978 self.kinds.iter().any(|candidate| candidate == kind)
979 }
980
981 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
982 self.secret_requirements
983 .iter()
984 .filter(|requirement| requirement.required)
985 .map(|requirement| requirement.name.as_str())
986 }
987}
988
989pub trait ProviderSchema: Send + Sync {
990 fn provider_id(&self) -> &str;
991 fn harn_schema_name(&self) -> &str;
992 fn metadata(&self) -> ProviderMetadata {
993 ProviderMetadata {
994 provider: self.provider_id().to_string(),
995 schema_name: self.harn_schema_name().to_string(),
996 ..ProviderMetadata::default()
997 }
998 }
999 fn normalize(
1000 &self,
1001 kind: &str,
1002 headers: &BTreeMap<String, String>,
1003 raw: JsonValue,
1004 ) -> Result<ProviderPayload, ProviderCatalogError>;
1005}
1006
1007#[derive(Clone, Debug, PartialEq, Eq)]
1008pub enum ProviderCatalogError {
1009 DuplicateProvider(String),
1010 UnknownProvider(String),
1011}
1012
1013impl std::fmt::Display for ProviderCatalogError {
1014 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1015 match self {
1016 Self::DuplicateProvider(provider) => {
1017 write!(f, "provider `{provider}` is already registered")
1018 }
1019 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
1020 }
1021 }
1022}
1023
1024impl std::error::Error for ProviderCatalogError {}
1025
1026#[derive(Clone, Default)]
1027pub struct ProviderCatalog {
1028 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
1029}
1030
1031impl ProviderCatalog {
1032 pub fn with_defaults() -> Self {
1033 let mut catalog = Self::default();
1034 for schema in default_provider_schemas() {
1035 catalog
1036 .register(schema)
1037 .expect("default providers must register cleanly");
1038 }
1039 catalog
1040 }
1041
1042 pub fn with_defaults_and(
1043 schemas: Vec<Arc<dyn ProviderSchema>>,
1044 ) -> Result<Self, ProviderCatalogError> {
1045 let mut catalog = Self::with_defaults();
1046 let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
1047 for schema in schemas {
1048 if builtin_providers.contains(schema.provider_id()) {
1049 continue;
1050 }
1051 catalog.register(schema)?;
1052 }
1053 Ok(catalog)
1054 }
1055
1056 pub fn register(
1057 &mut self,
1058 schema: Arc<dyn ProviderSchema>,
1059 ) -> Result<(), ProviderCatalogError> {
1060 let provider = schema.provider_id().to_string();
1061 if self.providers.contains_key(provider.as_str()) {
1062 return Err(ProviderCatalogError::DuplicateProvider(provider));
1063 }
1064 self.providers.insert(provider, schema);
1065 Ok(())
1066 }
1067
1068 pub fn normalize(
1069 &self,
1070 provider: &ProviderId,
1071 kind: &str,
1072 headers: &BTreeMap<String, String>,
1073 raw: JsonValue,
1074 ) -> Result<ProviderPayload, ProviderCatalogError> {
1075 let schema = self
1076 .providers
1077 .get(provider.as_str())
1078 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
1079 schema.normalize(kind, headers, raw)
1080 }
1081
1082 pub fn schema_names(&self) -> BTreeMap<String, String> {
1083 self.providers
1084 .iter()
1085 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
1086 .collect()
1087 }
1088
1089 pub fn entries(&self) -> Vec<ProviderMetadata> {
1090 self.providers
1091 .values()
1092 .map(|schema| schema.metadata())
1093 .collect()
1094 }
1095
1096 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
1097 self.providers.get(provider).map(|schema| schema.metadata())
1098 }
1099}
1100
1101pub fn register_provider_schema(
1102 schema: Arc<dyn ProviderSchema>,
1103) -> Result<(), ProviderCatalogError> {
1104 provider_catalog()
1105 .write()
1106 .expect("provider catalog poisoned")
1107 .register(schema)
1108}
1109
1110pub fn reset_provider_catalog() {
1111 *provider_catalog()
1112 .write()
1113 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
1114}
1115
1116pub fn reset_provider_catalog_with(
1117 schemas: Vec<Arc<dyn ProviderSchema>>,
1118) -> Result<(), ProviderCatalogError> {
1119 let catalog = ProviderCatalog::with_defaults_and(schemas)?;
1120 install_provider_catalog(catalog);
1121 Ok(())
1122}
1123
1124pub fn install_provider_catalog(catalog: ProviderCatalog) {
1125 *provider_catalog()
1126 .write()
1127 .expect("provider catalog poisoned") = catalog;
1128}
1129
1130pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1131 provider_catalog()
1132 .read()
1133 .expect("provider catalog poisoned")
1134 .schema_names()
1135}
1136
1137pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1138 provider_catalog()
1139 .read()
1140 .expect("provider catalog poisoned")
1141 .entries()
1142}
1143
1144pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1145 provider_catalog()
1146 .read()
1147 .expect("provider catalog poisoned")
1148 .metadata_for(provider)
1149}
1150
1151fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1152 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1153 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1154}
1155
1156struct BuiltinProviderSchema {
1157 provider_id: &'static str,
1158 harn_schema_name: &'static str,
1159 metadata: ProviderMetadata,
1160 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1161}
1162
1163impl ProviderSchema for BuiltinProviderSchema {
1164 fn provider_id(&self) -> &str {
1165 self.provider_id
1166 }
1167
1168 fn harn_schema_name(&self) -> &str {
1169 self.harn_schema_name
1170 }
1171
1172 fn metadata(&self) -> ProviderMetadata {
1173 self.metadata.clone()
1174 }
1175
1176 fn normalize(
1177 &self,
1178 kind: &str,
1179 headers: &BTreeMap<String, String>,
1180 raw: JsonValue,
1181 ) -> Result<ProviderPayload, ProviderCatalogError> {
1182 Ok((self.normalize)(kind, headers, raw))
1183 }
1184}
1185
1186fn provider_metadata_entry(
1187 provider: &str,
1188 kinds: &[&str],
1189 schema_name: &str,
1190 outbound_methods: &[&str],
1191 signature_verification: SignatureVerificationMetadata,
1192 secret_requirements: Vec<ProviderSecretRequirement>,
1193 runtime: ProviderRuntimeMetadata,
1194) -> ProviderMetadata {
1195 ProviderMetadata {
1196 provider: provider.to_string(),
1197 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1198 schema_name: schema_name.to_string(),
1199 outbound_methods: outbound_methods
1200 .iter()
1201 .map(|name| ProviderOutboundMethod {
1202 name: (*name).to_string(),
1203 })
1204 .collect(),
1205 secret_requirements,
1206 signature_verification,
1207 runtime,
1208 }
1209}
1210
1211fn hmac_signature_metadata(
1212 variant: &str,
1213 signature_header: &str,
1214 timestamp_header: Option<&str>,
1215 id_header: Option<&str>,
1216 default_tolerance_secs: Option<i64>,
1217 encoding: &str,
1218) -> SignatureVerificationMetadata {
1219 SignatureVerificationMetadata::Hmac {
1220 variant: variant.to_string(),
1221 raw_body: true,
1222 signature_header: signature_header.to_string(),
1223 timestamp_header: timestamp_header.map(ToString::to_string),
1224 id_header: id_header.map(ToString::to_string),
1225 default_tolerance_secs,
1226 digest: "sha256".to_string(),
1227 encoding: encoding.to_string(),
1228 }
1229}
1230
1231fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1232 ProviderSecretRequirement {
1233 name: name.to_string(),
1234 required: true,
1235 namespace: namespace.to_string(),
1236 }
1237}
1238
1239fn outbound_method(name: &str) -> ProviderOutboundMethod {
1240 ProviderOutboundMethod {
1241 name: name.to_string(),
1242 }
1243}
1244
1245fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1246 ProviderSecretRequirement {
1247 name: name.to_string(),
1248 required: false,
1249 namespace: namespace.to_string(),
1250 }
1251}
1252
1253fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1254 vec![
1255 Arc::new(BuiltinProviderSchema {
1256 provider_id: "github",
1257 harn_schema_name: "GitHubEventPayload",
1258 metadata: provider_metadata_entry(
1259 "github",
1260 &["webhook"],
1261 "GitHubEventPayload",
1262 &[
1263 "github.pr.list",
1264 "github.pr.view",
1265 "github.pr.checks",
1266 "github.pr.merge",
1267 "github.pr.enable_auto_merge",
1268 "github.pr.comment",
1269 "github.actions.workflow_dispatch",
1270 "github.actions.runs",
1271 "github.actions.run",
1272 "github.actions.logs",
1273 "github.release.latest",
1274 "github.release.assets",
1275 "github.merge_queue.entries",
1276 "github.merge_queue.enqueue",
1277 "github.issue.create",
1278 "github.issue.comment",
1279 "github.branch.protection",
1280 "api_call",
1281 "issues.create_comment",
1282 "issues.create",
1283 "issues.create_with_template",
1284 "issues.update",
1285 "issues.add_labels",
1286 "pulls.list",
1287 "pulls.list_with_checks",
1288 "pulls.get",
1289 "pulls.merge",
1290 "pulls.merge_safe",
1291 "pulls.create_review_comment",
1292 "pulls.get_diff",
1293 "pulls.list_files",
1294 "pulls.list_reviews",
1295 "repos.get_content",
1296 "repos.get_text",
1297 "repos.get_latest_release",
1298 "repos.list_release_assets",
1299 "repos.get_branch_protection",
1300 "git.delete_ref",
1301 "actions.workflow_dispatch",
1302 "actions.workflow_runs.list",
1303 "actions.workflow_run.get",
1304 "check_runs.create",
1305 "check_runs.update",
1306 "graphql",
1307 ],
1308 hmac_signature_metadata(
1309 "github",
1310 "X-Hub-Signature-256",
1311 None,
1312 Some("X-GitHub-Delivery"),
1313 None,
1314 "hex",
1315 ),
1316 vec![required_secret("signing_secret", "github")],
1317 ProviderRuntimeMetadata::Placeholder,
1318 ),
1319 normalize: github_payload,
1320 }),
1321 Arc::new(BuiltinProviderSchema {
1322 provider_id: "slack",
1323 harn_schema_name: "SlackEventPayload",
1324 metadata: provider_metadata_entry(
1325 "slack",
1326 &["webhook"],
1327 "SlackEventPayload",
1328 &[
1329 "post_message",
1330 "update_message",
1331 "add_reaction",
1332 "open_view",
1333 "user_info",
1334 "api_call",
1335 "upload_file",
1336 ],
1337 hmac_signature_metadata(
1338 "slack",
1339 "X-Slack-Signature",
1340 Some("X-Slack-Request-Timestamp"),
1341 None,
1342 Some(300),
1343 "hex",
1344 ),
1345 vec![required_secret("signing_secret", "slack")],
1346 ProviderRuntimeMetadata::Placeholder,
1347 ),
1348 normalize: slack_payload,
1349 }),
1350 Arc::new(BuiltinProviderSchema {
1351 provider_id: "linear",
1352 harn_schema_name: "LinearEventPayload",
1353 metadata: {
1354 let mut metadata = provider_metadata_entry(
1355 "linear",
1356 &["webhook"],
1357 "LinearEventPayload",
1358 &[],
1359 hmac_signature_metadata(
1360 "linear",
1361 "Linear-Signature",
1362 None,
1363 Some("Linear-Delivery"),
1364 Some(75),
1365 "hex",
1366 ),
1367 vec![
1368 required_secret("signing_secret", "linear"),
1369 optional_secret("access_token", "linear"),
1370 ],
1371 ProviderRuntimeMetadata::Placeholder,
1372 );
1373 metadata.outbound_methods = vec![
1374 ProviderOutboundMethod {
1375 name: "list_issues".to_string(),
1376 },
1377 ProviderOutboundMethod {
1378 name: "update_issue".to_string(),
1379 },
1380 ProviderOutboundMethod {
1381 name: "create_comment".to_string(),
1382 },
1383 ProviderOutboundMethod {
1384 name: "search".to_string(),
1385 },
1386 ProviderOutboundMethod {
1387 name: "graphql".to_string(),
1388 },
1389 ];
1390 metadata
1391 },
1392 normalize: linear_payload,
1393 }),
1394 Arc::new(BuiltinProviderSchema {
1395 provider_id: "notion",
1396 harn_schema_name: "NotionEventPayload",
1397 metadata: {
1398 let mut metadata = provider_metadata_entry(
1399 "notion",
1400 &["webhook", "poll"],
1401 "NotionEventPayload",
1402 &[],
1403 hmac_signature_metadata(
1404 "notion",
1405 "X-Notion-Signature",
1406 None,
1407 None,
1408 None,
1409 "hex",
1410 ),
1411 vec![required_secret("verification_token", "notion")],
1412 ProviderRuntimeMetadata::Placeholder,
1413 );
1414 metadata.outbound_methods = vec![
1415 outbound_method("get_page"),
1416 outbound_method("update_page"),
1417 outbound_method("append_blocks"),
1418 outbound_method("query_database"),
1419 outbound_method("search"),
1420 outbound_method("create_comment"),
1421 outbound_method("api_call"),
1422 ];
1423 metadata
1424 },
1425 normalize: notion_payload,
1426 }),
1427 Arc::new(BuiltinProviderSchema {
1428 provider_id: "cron",
1429 harn_schema_name: "CronEventPayload",
1430 metadata: provider_metadata_entry(
1431 "cron",
1432 &["cron"],
1433 "CronEventPayload",
1434 &[],
1435 SignatureVerificationMetadata::None,
1436 Vec::new(),
1437 ProviderRuntimeMetadata::Builtin {
1438 connector: "cron".to_string(),
1439 default_signature_variant: None,
1440 },
1441 ),
1442 normalize: cron_payload,
1443 }),
1444 Arc::new(BuiltinProviderSchema {
1445 provider_id: "webhook",
1446 harn_schema_name: "GenericWebhookPayload",
1447 metadata: provider_metadata_entry(
1448 "webhook",
1449 &["webhook"],
1450 "GenericWebhookPayload",
1451 &[],
1452 hmac_signature_metadata(
1453 "standard",
1454 "webhook-signature",
1455 Some("webhook-timestamp"),
1456 Some("webhook-id"),
1457 Some(300),
1458 "base64",
1459 ),
1460 vec![required_secret("signing_secret", "webhook")],
1461 ProviderRuntimeMetadata::Builtin {
1462 connector: "webhook".to_string(),
1463 default_signature_variant: Some("standard".to_string()),
1464 },
1465 ),
1466 normalize: webhook_payload,
1467 }),
1468 Arc::new(BuiltinProviderSchema {
1469 provider_id: "a2a-push",
1470 harn_schema_name: "A2aPushPayload",
1471 metadata: provider_metadata_entry(
1472 "a2a-push",
1473 &["a2a-push"],
1474 "A2aPushPayload",
1475 &[],
1476 SignatureVerificationMetadata::None,
1477 Vec::new(),
1478 ProviderRuntimeMetadata::Builtin {
1479 connector: "a2a-push".to_string(),
1480 default_signature_variant: None,
1481 },
1482 ),
1483 normalize: a2a_push_payload,
1484 }),
1485 Arc::new(stream_provider_schema("kafka", kafka_payload)),
1486 Arc::new(stream_provider_schema("nats", nats_payload)),
1487 Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1488 Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1489 Arc::new(stream_provider_schema("email", email_payload)),
1490 Arc::new(stream_provider_schema("websocket", websocket_payload)),
1491 ]
1492}
1493
1494fn stream_provider_schema(
1495 provider_id: &'static str,
1496 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1497) -> BuiltinProviderSchema {
1498 BuiltinProviderSchema {
1499 provider_id,
1500 harn_schema_name: "StreamEventPayload",
1501 metadata: provider_metadata_entry(
1502 provider_id,
1503 &["stream"],
1504 "StreamEventPayload",
1505 &[],
1506 SignatureVerificationMetadata::None,
1507 Vec::new(),
1508 ProviderRuntimeMetadata::Builtin {
1509 connector: "stream".to_string(),
1510 default_signature_variant: None,
1511 },
1512 ),
1513 normalize,
1514 }
1515}
1516
1517fn github_payload(
1518 kind: &str,
1519 headers: &BTreeMap<String, String>,
1520 raw: JsonValue,
1521) -> ProviderPayload {
1522 let original_raw = raw
1527 .get("raw")
1528 .filter(|value| value.is_object())
1529 .cloned()
1530 .unwrap_or_else(|| raw.clone());
1531 let common = GitHubEventCommon {
1532 event: kind.to_string(),
1533 action: raw
1534 .get("action")
1535 .and_then(JsonValue::as_str)
1536 .map(ToString::to_string),
1537 delivery_id: raw
1538 .get("delivery_id")
1539 .and_then(JsonValue::as_str)
1540 .map(ToString::to_string)
1541 .or_else(|| headers.get("X-GitHub-Delivery").cloned()),
1542 installation_id: raw
1543 .get("installation_id")
1544 .and_then(JsonValue::as_i64)
1545 .or_else(|| {
1546 raw.get("installation")
1547 .and_then(|value| value.get("id"))
1548 .and_then(JsonValue::as_i64)
1549 }),
1550 topic: raw
1551 .get("topic")
1552 .and_then(JsonValue::as_str)
1553 .map(ToString::to_string),
1554 repository: raw.get("repository").cloned(),
1555 repo: raw.get("repo").cloned(),
1556 raw: original_raw,
1557 };
1558 let payload = match kind {
1559 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1560 common,
1561 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1562 }),
1563 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1564 common,
1565 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1566 }),
1567 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1568 common,
1569 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1570 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1571 }),
1572 "pull_request_review" => {
1573 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1574 common,
1575 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1576 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1577 })
1578 }
1579 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1580 common,
1581 commits: raw
1582 .get("commits")
1583 .and_then(JsonValue::as_array)
1584 .cloned()
1585 .unwrap_or_default(),
1586 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1587 }),
1588 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1589 common,
1590 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1591 }),
1592 "deployment_status" => {
1593 GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1594 common,
1595 deployment_status: raw
1596 .get("deployment_status")
1597 .cloned()
1598 .unwrap_or(JsonValue::Null),
1599 deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1600 })
1601 }
1602 "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1603 common,
1604 check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1605 }),
1606 "check_suite" => {
1607 let check_suite = raw.get("check_suite").cloned().unwrap_or(JsonValue::Null);
1608 GitHubEventPayload::CheckSuite(GitHubCheckSuiteEventPayload {
1609 check_suite_id: github_promoted_i64(&raw, "check_suite_id")
1610 .or_else(|| check_suite.get("id").and_then(JsonValue::as_i64)),
1611 pull_request_number: github_promoted_i64(&raw, "pull_request_number"),
1612 head_sha: github_promoted_string(&raw, "head_sha"),
1613 head_ref: github_promoted_string(&raw, "head_ref"),
1614 base_ref: github_promoted_string(&raw, "base_ref"),
1615 status: github_promoted_string(&raw, "status"),
1616 conclusion: github_promoted_string(&raw, "conclusion"),
1617 common,
1618 check_suite,
1619 })
1620 }
1621 "status" => GitHubEventPayload::Status(GitHubStatusEventPayload {
1622 commit_status: raw
1623 .get("commit_status")
1624 .cloned()
1625 .or_else(|| Some(common.raw.clone())),
1626 status_id: github_promoted_i64(&raw, "status_id")
1627 .or_else(|| common.raw.get("id").and_then(JsonValue::as_i64)),
1628 head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1629 common
1630 .raw
1631 .get("sha")
1632 .and_then(JsonValue::as_str)
1633 .map(ToString::to_string)
1634 }),
1635 head_ref: github_promoted_string(&raw, "head_ref"),
1636 base_ref: github_promoted_string(&raw, "base_ref"),
1637 state: github_promoted_string(&raw, "state"),
1638 context: github_promoted_string(&raw, "context"),
1639 target_url: github_promoted_string(&raw, "target_url"),
1640 common,
1641 }),
1642 "merge_group" => {
1643 let merge_group = raw.get("merge_group").cloned().unwrap_or(JsonValue::Null);
1644 GitHubEventPayload::MergeGroup(GitHubMergeGroupEventPayload {
1645 merge_group_id: raw
1646 .get("merge_group_id")
1647 .cloned()
1648 .or_else(|| merge_group.get("id").cloned()),
1649 head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1650 merge_group
1651 .get("head_sha")
1652 .and_then(JsonValue::as_str)
1653 .map(ToString::to_string)
1654 }),
1655 head_ref: github_promoted_string(&raw, "head_ref").or_else(|| {
1656 merge_group
1657 .get("head_ref")
1658 .and_then(JsonValue::as_str)
1659 .map(ToString::to_string)
1660 }),
1661 base_sha: github_promoted_string(&raw, "base_sha").or_else(|| {
1662 merge_group
1663 .get("base_sha")
1664 .and_then(JsonValue::as_str)
1665 .map(ToString::to_string)
1666 }),
1667 base_ref: github_promoted_string(&raw, "base_ref").or_else(|| {
1668 merge_group
1669 .get("base_ref")
1670 .and_then(JsonValue::as_str)
1671 .map(ToString::to_string)
1672 }),
1673 pull_requests: raw
1674 .get("pull_requests")
1675 .and_then(JsonValue::as_array)
1676 .cloned()
1677 .unwrap_or_default(),
1678 pull_request_numbers: raw
1679 .get("pull_request_numbers")
1680 .and_then(JsonValue::as_array)
1681 .map(|values| {
1682 values
1683 .iter()
1684 .filter_map(JsonValue::as_i64)
1685 .collect::<Vec<_>>()
1686 })
1687 .unwrap_or_default(),
1688 common,
1689 merge_group,
1690 })
1691 }
1692 "installation" => GitHubEventPayload::Installation(GitHubInstallationEventPayload {
1693 installation: raw.get("installation").cloned(),
1694 account: raw.get("account").cloned(),
1695 installation_state: github_promoted_string(&raw, "installation_state"),
1696 suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1697 revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1698 repositories: raw
1699 .get("repositories")
1700 .and_then(JsonValue::as_array)
1701 .cloned()
1702 .unwrap_or_default(),
1703 common,
1704 }),
1705 "installation_repositories" => GitHubEventPayload::InstallationRepositories(
1706 GitHubInstallationRepositoriesEventPayload {
1707 installation: raw.get("installation").cloned(),
1708 account: raw.get("account").cloned(),
1709 installation_state: github_promoted_string(&raw, "installation_state"),
1710 suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1711 revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1712 repository_selection: github_promoted_string(&raw, "repository_selection"),
1713 repositories_added: raw
1714 .get("repositories_added")
1715 .and_then(JsonValue::as_array)
1716 .cloned()
1717 .unwrap_or_default(),
1718 repositories_removed: raw
1719 .get("repositories_removed")
1720 .and_then(JsonValue::as_array)
1721 .cloned()
1722 .unwrap_or_default(),
1723 common,
1724 },
1725 ),
1726 _ => GitHubEventPayload::Other(common),
1727 };
1728 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1729}
1730
1731fn github_promoted_string(raw: &JsonValue, field: &str) -> Option<String> {
1732 raw.get(field)
1733 .and_then(JsonValue::as_str)
1734 .map(ToString::to_string)
1735}
1736
1737fn github_promoted_i64(raw: &JsonValue, field: &str) -> Option<i64> {
1738 raw.get(field).and_then(JsonValue::as_i64)
1739}
1740
1741fn slack_payload(
1742 kind: &str,
1743 _headers: &BTreeMap<String, String>,
1744 raw: JsonValue,
1745) -> ProviderPayload {
1746 let event = raw.get("event");
1747 let common = SlackEventCommon {
1748 event: kind.to_string(),
1749 event_id: raw
1750 .get("event_id")
1751 .and_then(JsonValue::as_str)
1752 .map(ToString::to_string),
1753 api_app_id: raw
1754 .get("api_app_id")
1755 .and_then(JsonValue::as_str)
1756 .map(ToString::to_string),
1757 team_id: raw
1758 .get("team_id")
1759 .and_then(JsonValue::as_str)
1760 .map(ToString::to_string),
1761 channel_id: slack_channel_id(event),
1762 user_id: slack_user_id(event),
1763 event_ts: event
1764 .and_then(|value| value.get("event_ts"))
1765 .and_then(JsonValue::as_str)
1766 .map(ToString::to_string),
1767 raw: raw.clone(),
1768 };
1769 let payload = match kind {
1770 kind if kind == "message" || kind.starts_with("message.") => {
1771 SlackEventPayload::Message(SlackMessageEventPayload {
1772 subtype: event
1773 .and_then(|value| value.get("subtype"))
1774 .and_then(JsonValue::as_str)
1775 .map(ToString::to_string),
1776 channel_type: event
1777 .and_then(|value| value.get("channel_type"))
1778 .and_then(JsonValue::as_str)
1779 .map(ToString::to_string),
1780 channel: event
1781 .and_then(|value| value.get("channel"))
1782 .and_then(JsonValue::as_str)
1783 .map(ToString::to_string),
1784 user: event
1785 .and_then(|value| value.get("user"))
1786 .and_then(JsonValue::as_str)
1787 .map(ToString::to_string),
1788 text: event
1789 .and_then(|value| value.get("text"))
1790 .and_then(JsonValue::as_str)
1791 .map(ToString::to_string),
1792 ts: event
1793 .and_then(|value| value.get("ts"))
1794 .and_then(JsonValue::as_str)
1795 .map(ToString::to_string),
1796 thread_ts: event
1797 .and_then(|value| value.get("thread_ts"))
1798 .and_then(JsonValue::as_str)
1799 .map(ToString::to_string),
1800 common,
1801 })
1802 }
1803 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1804 channel: event
1805 .and_then(|value| value.get("channel"))
1806 .and_then(JsonValue::as_str)
1807 .map(ToString::to_string),
1808 user: event
1809 .and_then(|value| value.get("user"))
1810 .and_then(JsonValue::as_str)
1811 .map(ToString::to_string),
1812 text: event
1813 .and_then(|value| value.get("text"))
1814 .and_then(JsonValue::as_str)
1815 .map(ToString::to_string),
1816 ts: event
1817 .and_then(|value| value.get("ts"))
1818 .and_then(JsonValue::as_str)
1819 .map(ToString::to_string),
1820 thread_ts: event
1821 .and_then(|value| value.get("thread_ts"))
1822 .and_then(JsonValue::as_str)
1823 .map(ToString::to_string),
1824 common,
1825 }),
1826 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1827 reaction: event
1828 .and_then(|value| value.get("reaction"))
1829 .and_then(JsonValue::as_str)
1830 .map(ToString::to_string),
1831 item_user: event
1832 .and_then(|value| value.get("item_user"))
1833 .and_then(JsonValue::as_str)
1834 .map(ToString::to_string),
1835 item: event
1836 .and_then(|value| value.get("item"))
1837 .cloned()
1838 .unwrap_or(JsonValue::Null),
1839 common,
1840 }),
1841 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1842 user: event
1843 .and_then(|value| value.get("user"))
1844 .and_then(JsonValue::as_str)
1845 .map(ToString::to_string),
1846 channel: event
1847 .and_then(|value| value.get("channel"))
1848 .and_then(JsonValue::as_str)
1849 .map(ToString::to_string),
1850 tab: event
1851 .and_then(|value| value.get("tab"))
1852 .and_then(JsonValue::as_str)
1853 .map(ToString::to_string),
1854 view: event
1855 .and_then(|value| value.get("view"))
1856 .cloned()
1857 .unwrap_or(JsonValue::Null),
1858 common,
1859 }),
1860 "assistant_thread_started" => {
1861 let assistant_thread = event
1862 .and_then(|value| value.get("assistant_thread"))
1863 .cloned()
1864 .unwrap_or(JsonValue::Null);
1865 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1866 thread_ts: assistant_thread
1867 .get("thread_ts")
1868 .and_then(JsonValue::as_str)
1869 .map(ToString::to_string),
1870 context: assistant_thread
1871 .get("context")
1872 .cloned()
1873 .unwrap_or(JsonValue::Null),
1874 assistant_thread,
1875 common,
1876 })
1877 }
1878 _ => SlackEventPayload::Other(common),
1879 };
1880 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1881}
1882
1883fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1884 event
1885 .and_then(|value| value.get("channel"))
1886 .and_then(JsonValue::as_str)
1887 .map(ToString::to_string)
1888 .or_else(|| {
1889 event
1890 .and_then(|value| value.get("item"))
1891 .and_then(|value| value.get("channel"))
1892 .and_then(JsonValue::as_str)
1893 .map(ToString::to_string)
1894 })
1895 .or_else(|| {
1896 event
1897 .and_then(|value| value.get("channel"))
1898 .and_then(|value| value.get("id"))
1899 .and_then(JsonValue::as_str)
1900 .map(ToString::to_string)
1901 })
1902 .or_else(|| {
1903 event
1904 .and_then(|value| value.get("assistant_thread"))
1905 .and_then(|value| value.get("channel_id"))
1906 .and_then(JsonValue::as_str)
1907 .map(ToString::to_string)
1908 })
1909}
1910
1911fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1912 event
1913 .and_then(|value| value.get("user"))
1914 .and_then(JsonValue::as_str)
1915 .map(ToString::to_string)
1916 .or_else(|| {
1917 event
1918 .and_then(|value| value.get("user"))
1919 .and_then(|value| value.get("id"))
1920 .and_then(JsonValue::as_str)
1921 .map(ToString::to_string)
1922 })
1923 .or_else(|| {
1924 event
1925 .and_then(|value| value.get("item_user"))
1926 .and_then(JsonValue::as_str)
1927 .map(ToString::to_string)
1928 })
1929 .or_else(|| {
1930 event
1931 .and_then(|value| value.get("assistant_thread"))
1932 .and_then(|value| value.get("user_id"))
1933 .and_then(JsonValue::as_str)
1934 .map(ToString::to_string)
1935 })
1936}
1937
1938fn linear_payload(
1939 _kind: &str,
1940 headers: &BTreeMap<String, String>,
1941 raw: JsonValue,
1942) -> ProviderPayload {
1943 let common = linear_event_common(headers, &raw);
1944 let event = common.event.clone();
1945 let payload = match event.as_str() {
1946 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1947 common,
1948 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1949 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1950 }),
1951 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1952 common,
1953 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1954 }),
1955 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1956 common,
1957 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1958 }),
1959 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1960 common,
1961 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1962 }),
1963 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1964 common,
1965 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1966 }),
1967 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1968 common,
1969 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1970 }),
1971 "customer_request" => {
1972 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1973 common,
1974 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1975 })
1976 }
1977 _ => LinearEventPayload::Other(common),
1978 };
1979 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1980}
1981
1982fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1983 LinearEventCommon {
1984 event: linear_event_name(
1985 raw.get("type")
1986 .and_then(JsonValue::as_str)
1987 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1988 ),
1989 action: raw
1990 .get("action")
1991 .and_then(JsonValue::as_str)
1992 .map(ToString::to_string),
1993 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1994 organization_id: raw
1995 .get("organizationId")
1996 .and_then(JsonValue::as_str)
1997 .map(ToString::to_string),
1998 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1999 webhook_id: raw
2000 .get("webhookId")
2001 .and_then(JsonValue::as_str)
2002 .map(ToString::to_string),
2003 url: raw
2004 .get("url")
2005 .and_then(JsonValue::as_str)
2006 .map(ToString::to_string),
2007 created_at: raw
2008 .get("createdAt")
2009 .and_then(JsonValue::as_str)
2010 .map(ToString::to_string),
2011 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
2012 raw: raw.clone(),
2013 }
2014}
2015
2016fn linear_event_name(raw_type: Option<&str>) -> String {
2017 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
2018 "issue" => "issue".to_string(),
2019 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
2020 "issuelabel" | "issue_label" => "issue_label".to_string(),
2021 "project" | "projectupdate" | "project_update" => "project".to_string(),
2022 "cycle" => "cycle".to_string(),
2023 "customer" => "customer".to_string(),
2024 "customerrequest" | "customer_request" => "customer_request".to_string(),
2025 other if !other.is_empty() => other.to_string(),
2026 _ => "other".to_string(),
2027 }
2028}
2029
2030fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
2031 let Some(JsonValue::Object(fields)) = updated_from else {
2032 return Vec::new();
2033 };
2034 let mut changes = Vec::new();
2035 for (field, previous) in fields {
2036 let change = match field.as_str() {
2037 "title" => LinearIssueChange::Title {
2038 previous: previous.as_str().map(ToString::to_string),
2039 },
2040 "description" => LinearIssueChange::Description {
2041 previous: previous.as_str().map(ToString::to_string),
2042 },
2043 "priority" => LinearIssueChange::Priority {
2044 previous: parse_json_i64ish(previous),
2045 },
2046 "estimate" => LinearIssueChange::Estimate {
2047 previous: parse_json_i64ish(previous),
2048 },
2049 "stateId" => LinearIssueChange::StateId {
2050 previous: previous.as_str().map(ToString::to_string),
2051 },
2052 "teamId" => LinearIssueChange::TeamId {
2053 previous: previous.as_str().map(ToString::to_string),
2054 },
2055 "assigneeId" => LinearIssueChange::AssigneeId {
2056 previous: previous.as_str().map(ToString::to_string),
2057 },
2058 "projectId" => LinearIssueChange::ProjectId {
2059 previous: previous.as_str().map(ToString::to_string),
2060 },
2061 "cycleId" => LinearIssueChange::CycleId {
2062 previous: previous.as_str().map(ToString::to_string),
2063 },
2064 "dueDate" => LinearIssueChange::DueDate {
2065 previous: previous.as_str().map(ToString::to_string),
2066 },
2067 "parentId" => LinearIssueChange::ParentId {
2068 previous: previous.as_str().map(ToString::to_string),
2069 },
2070 "sortOrder" => LinearIssueChange::SortOrder {
2071 previous: previous.as_f64(),
2072 },
2073 "labelIds" => LinearIssueChange::LabelIds {
2074 previous: parse_string_array(previous),
2075 },
2076 "completedAt" => LinearIssueChange::CompletedAt {
2077 previous: previous.as_str().map(ToString::to_string),
2078 },
2079 _ => LinearIssueChange::Other {
2080 field: field.clone(),
2081 previous: previous.clone(),
2082 },
2083 };
2084 changes.push(change);
2085 }
2086 changes
2087}
2088
2089fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
2090 value
2091 .as_i64()
2092 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
2093 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
2094}
2095
2096fn parse_string_array(value: &JsonValue) -> Vec<String> {
2097 let Some(array) = value.as_array() else {
2098 return Vec::new();
2099 };
2100 array
2101 .iter()
2102 .filter_map(|entry| {
2103 entry.as_str().map(ToString::to_string).or_else(|| {
2104 entry
2105 .get("id")
2106 .and_then(JsonValue::as_str)
2107 .map(ToString::to_string)
2108 })
2109 })
2110 .collect()
2111}
2112
2113fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
2114 headers
2115 .iter()
2116 .find(|(key, _)| key.eq_ignore_ascii_case(name))
2117 .map(|(_, value)| value.as_str())
2118}
2119
2120fn notion_payload(
2121 kind: &str,
2122 headers: &BTreeMap<String, String>,
2123 raw: JsonValue,
2124) -> ProviderPayload {
2125 let workspace_id = raw
2126 .get("workspace_id")
2127 .and_then(JsonValue::as_str)
2128 .map(ToString::to_string);
2129 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
2130 event: kind.to_string(),
2131 workspace_id,
2132 request_id: headers
2133 .get("request-id")
2134 .cloned()
2135 .or_else(|| headers.get("x-request-id").cloned()),
2136 subscription_id: raw
2137 .get("subscription_id")
2138 .and_then(JsonValue::as_str)
2139 .map(ToString::to_string),
2140 integration_id: raw
2141 .get("integration_id")
2142 .and_then(JsonValue::as_str)
2143 .map(ToString::to_string),
2144 attempt_number: raw
2145 .get("attempt_number")
2146 .and_then(JsonValue::as_u64)
2147 .and_then(|value| u32::try_from(value).ok()),
2148 entity_id: raw
2149 .get("entity")
2150 .and_then(|value| value.get("id"))
2151 .and_then(JsonValue::as_str)
2152 .map(ToString::to_string),
2153 entity_type: raw
2154 .get("entity")
2155 .and_then(|value| value.get("type"))
2156 .and_then(JsonValue::as_str)
2157 .map(ToString::to_string),
2158 api_version: raw
2159 .get("api_version")
2160 .and_then(JsonValue::as_str)
2161 .map(ToString::to_string),
2162 verification_token: raw
2163 .get("verification_token")
2164 .and_then(JsonValue::as_str)
2165 .map(ToString::to_string),
2166 polled: None,
2167 raw,
2168 })))
2169}
2170
2171fn cron_payload(
2172 _kind: &str,
2173 _headers: &BTreeMap<String, String>,
2174 raw: JsonValue,
2175) -> ProviderPayload {
2176 let cron_id = raw
2177 .get("cron_id")
2178 .and_then(JsonValue::as_str)
2179 .map(ToString::to_string);
2180 let schedule = raw
2181 .get("schedule")
2182 .and_then(JsonValue::as_str)
2183 .map(ToString::to_string);
2184 let tick_at = raw
2185 .get("tick_at")
2186 .and_then(JsonValue::as_str)
2187 .and_then(parse_rfc3339)
2188 .unwrap_or_else(OffsetDateTime::now_utc);
2189 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
2190 cron_id,
2191 schedule,
2192 tick_at,
2193 raw,
2194 }))
2195}
2196
2197fn webhook_payload(
2198 _kind: &str,
2199 headers: &BTreeMap<String, String>,
2200 raw: JsonValue,
2201) -> ProviderPayload {
2202 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
2203 source: headers.get("X-Webhook-Source").cloned(),
2204 content_type: headers.get("Content-Type").cloned(),
2205 raw,
2206 }))
2207}
2208
2209fn a2a_push_payload(
2210 _kind: &str,
2211 _headers: &BTreeMap<String, String>,
2212 raw: JsonValue,
2213) -> ProviderPayload {
2214 let task_id = raw
2215 .get("task_id")
2216 .and_then(JsonValue::as_str)
2217 .map(ToString::to_string);
2218 let sender = raw
2219 .get("sender")
2220 .and_then(JsonValue::as_str)
2221 .map(ToString::to_string);
2222 let task_state = raw
2223 .pointer("/status/state")
2224 .or_else(|| raw.pointer("/statusUpdate/status/state"))
2225 .and_then(JsonValue::as_str)
2226 .map(|state| match state {
2227 "cancelled" => "canceled".to_string(),
2228 other => other.to_string(),
2229 });
2230 let artifact = raw
2231 .pointer("/artifactUpdate/artifact")
2232 .or_else(|| raw.get("artifact"))
2233 .cloned();
2234 let kind = task_state
2235 .as_deref()
2236 .map(|state| format!("a2a.task.{state}"))
2237 .unwrap_or_else(|| "a2a.task.update".to_string());
2238 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
2239 task_id,
2240 task_state,
2241 artifact,
2242 sender,
2243 raw,
2244 kind,
2245 }))
2246}
2247
2248fn kafka_payload(
2249 kind: &str,
2250 headers: &BTreeMap<String, String>,
2251 raw: JsonValue,
2252) -> ProviderPayload {
2253 ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
2254 kind, headers, raw,
2255 )))
2256}
2257
2258fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
2259 ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
2260 kind, headers, raw,
2261 )))
2262}
2263
2264fn pulsar_payload(
2265 kind: &str,
2266 headers: &BTreeMap<String, String>,
2267 raw: JsonValue,
2268) -> ProviderPayload {
2269 ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
2270 kind, headers, raw,
2271 )))
2272}
2273
2274fn postgres_cdc_payload(
2275 kind: &str,
2276 headers: &BTreeMap<String, String>,
2277 raw: JsonValue,
2278) -> ProviderPayload {
2279 ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
2280 kind, headers, raw,
2281 )))
2282}
2283
2284fn email_payload(
2285 kind: &str,
2286 headers: &BTreeMap<String, String>,
2287 raw: JsonValue,
2288) -> ProviderPayload {
2289 ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
2290 kind, headers, raw,
2291 )))
2292}
2293
2294fn websocket_payload(
2295 kind: &str,
2296 headers: &BTreeMap<String, String>,
2297 raw: JsonValue,
2298) -> ProviderPayload {
2299 ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
2300 kind, headers, raw,
2301 )))
2302}
2303
2304fn stream_payload(
2305 kind: &str,
2306 headers: &BTreeMap<String, String>,
2307 raw: JsonValue,
2308) -> StreamEventPayload {
2309 StreamEventPayload {
2310 event: kind.to_string(),
2311 source: json_stringish(&raw, &["source", "connector", "origin"]),
2312 stream: json_stringish(
2313 &raw,
2314 &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2315 ),
2316 partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2317 offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2318 key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2319 timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2320 headers: headers.clone(),
2321 raw,
2322 }
2323}
2324
2325fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2326 fields.iter().find_map(|field| {
2327 let value = raw.get(*field)?;
2328 value
2329 .as_str()
2330 .map(ToString::to_string)
2331 .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2332 .or_else(|| value.as_u64().map(|number| number.to_string()))
2333 })
2334}
2335
2336fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2337 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342 use super::*;
2343
2344 struct OwnedProviderSchema {
2345 metadata: ProviderMetadata,
2346 }
2347
2348 impl OwnedProviderSchema {
2349 fn new(provider: &str, schema_name: &str) -> Self {
2350 Self {
2351 metadata: ProviderMetadata {
2352 provider: provider.to_string(),
2353 kinds: vec!["webhook".to_string()],
2354 schema_name: schema_name.to_string(),
2355 runtime: ProviderRuntimeMetadata::Placeholder,
2356 ..ProviderMetadata::default()
2357 },
2358 }
2359 }
2360 }
2361
2362 impl ProviderSchema for OwnedProviderSchema {
2363 fn provider_id(&self) -> &str {
2364 &self.metadata.provider
2365 }
2366
2367 fn harn_schema_name(&self) -> &str {
2368 &self.metadata.schema_name
2369 }
2370
2371 fn metadata(&self) -> ProviderMetadata {
2372 self.metadata.clone()
2373 }
2374
2375 fn normalize(
2376 &self,
2377 _kind: &str,
2378 _headers: &BTreeMap<String, String>,
2379 raw: JsonValue,
2380 ) -> Result<ProviderPayload, ProviderCatalogError> {
2381 Ok(ProviderPayload::Extension(ExtensionProviderPayload {
2382 provider: self.metadata.provider.clone(),
2383 schema_name: self.metadata.schema_name.clone(),
2384 raw,
2385 }))
2386 }
2387 }
2388
2389 fn owned_provider_schema(provider: &str, schema_name: &str) -> Arc<dyn ProviderSchema> {
2390 Arc::new(OwnedProviderSchema::new(provider, schema_name))
2391 }
2392
2393 fn sample_headers() -> BTreeMap<String, String> {
2394 BTreeMap::from([
2395 ("Authorization".to_string(), "Bearer secret".to_string()),
2396 ("Cookie".to_string(), "session=abc".to_string()),
2397 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2398 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2399 ("X-GitHub-Event".to_string(), "issues".to_string()),
2400 ("X-Webhook-Token".to_string(), "token".to_string()),
2401 ])
2402 }
2403
2404 #[test]
2405 fn default_redaction_policy_keeps_safe_headers() {
2406 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2407 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2408 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2409 assert_eq!(
2410 redacted.get("Authorization").unwrap(),
2411 REDACTED_HEADER_VALUE
2412 );
2413 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2414 assert_eq!(
2415 redacted.get("X-Webhook-Token").unwrap(),
2416 REDACTED_HEADER_VALUE
2417 );
2418 }
2419
2420 #[test]
2421 fn provider_catalog_rejects_duplicates() {
2422 let mut catalog = ProviderCatalog::default();
2423 catalog
2424 .register(Arc::new(BuiltinProviderSchema {
2425 provider_id: "github",
2426 harn_schema_name: "GitHubEventPayload",
2427 metadata: provider_metadata_entry(
2428 "github",
2429 &["webhook"],
2430 "GitHubEventPayload",
2431 &[],
2432 SignatureVerificationMetadata::None,
2433 Vec::new(),
2434 ProviderRuntimeMetadata::Placeholder,
2435 ),
2436 normalize: github_payload,
2437 }))
2438 .unwrap();
2439 let error = catalog
2440 .register(Arc::new(BuiltinProviderSchema {
2441 provider_id: "github",
2442 harn_schema_name: "GitHubEventPayload",
2443 metadata: provider_metadata_entry(
2444 "github",
2445 &["webhook"],
2446 "GitHubEventPayload",
2447 &[],
2448 SignatureVerificationMetadata::None,
2449 Vec::new(),
2450 ProviderRuntimeMetadata::Placeholder,
2451 ),
2452 normalize: github_payload,
2453 }))
2454 .unwrap_err();
2455 assert_eq!(
2456 error,
2457 ProviderCatalogError::DuplicateProvider("github".to_string())
2458 );
2459 }
2460
2461 #[test]
2462 fn provider_catalog_builds_independent_owned_dynamic_catalogs() {
2463 let first = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2464 "runtime-a",
2465 "RuntimeAPayload",
2466 )])
2467 .unwrap();
2468 assert_eq!(
2469 first
2470 .metadata_for("runtime-a")
2471 .expect("first dynamic provider")
2472 .schema_name,
2473 "RuntimeAPayload"
2474 );
2475 assert!(first.metadata_for("runtime-b").is_none());
2476
2477 let second = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2478 "runtime-b",
2479 "RuntimeBPayload",
2480 )])
2481 .unwrap();
2482 assert!(second.metadata_for("runtime-a").is_none());
2483 assert_eq!(
2484 second
2485 .metadata_for("runtime-b")
2486 .expect("second dynamic provider")
2487 .schema_name,
2488 "RuntimeBPayload"
2489 );
2490 assert!(first.metadata_for("runtime-a").is_some());
2491 }
2492
2493 #[test]
2494 fn registered_provider_metadata_marks_builtin_connectors() {
2495 let entries = registered_provider_metadata();
2496 let builtin: Vec<&ProviderMetadata> = entries
2497 .iter()
2498 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2499 .collect();
2500
2501 assert_eq!(builtin.len(), 9);
2502 assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2503 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2504 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2505 for provider in ["github", "linear", "notion", "slack"] {
2506 let entry = entries
2507 .iter()
2508 .find(|entry| entry.provider == provider)
2509 .expect("first-party package-backed provider metadata");
2510 assert!(matches!(
2511 entry.runtime,
2512 ProviderRuntimeMetadata::Placeholder
2513 ));
2514 }
2515 let kafka = entries
2516 .iter()
2517 .find(|entry| entry.provider == "kafka")
2518 .expect("kafka stream provider");
2519 assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2520 assert_eq!(kafka.schema_name, "StreamEventPayload");
2521 assert!(matches!(
2522 kafka.runtime,
2523 ProviderRuntimeMetadata::Builtin {
2524 ref connector,
2525 default_signature_variant: None
2526 } if connector == "stream"
2527 ));
2528 }
2529
2530 #[test]
2531 fn trigger_event_round_trip_is_stable() {
2532 let provider = ProviderId::from("github");
2533 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2534 let payload = ProviderPayload::normalize(
2535 &provider,
2536 "issues",
2537 &sample_headers(),
2538 serde_json::json!({
2539 "action": "opened",
2540 "installation": {"id": 42},
2541 "issue": {"number": 99}
2542 }),
2543 )
2544 .unwrap();
2545 let event = TriggerEvent {
2546 id: TriggerEventId("trigger_evt_fixed".to_string()),
2547 provider,
2548 kind: "issues".to_string(),
2549 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2550 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2551 dedupe_key: "delivery-123".to_string(),
2552 trace_id: TraceId("trace_fixed".to_string()),
2553 tenant_id: Some(TenantId("tenant_1".to_string())),
2554 headers,
2555 provider_payload: payload,
2556 signature_status: SignatureStatus::Verified,
2557 dedupe_claimed: false,
2558 batch: None,
2559 raw_body: Some(vec![0, 159, 255, 10]),
2560 };
2561
2562 let once = serde_json::to_value(&event).unwrap();
2563 assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2564 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2565 let twice = serde_json::to_value(&decoded).unwrap();
2566 assert_eq!(decoded, event);
2567 assert_eq!(once, twice);
2568 }
2569
2570 #[test]
2571 fn unknown_provider_errors() {
2572 let error = ProviderPayload::normalize(
2573 &ProviderId::from("custom-provider"),
2574 "thing.happened",
2575 &BTreeMap::new(),
2576 serde_json::json!({"ok": true}),
2577 )
2578 .unwrap_err();
2579 assert_eq!(
2580 error,
2581 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2582 );
2583 }
2584
2585 fn github_headers(event: &str, delivery: &str) -> BTreeMap<String, String> {
2586 BTreeMap::from([
2587 ("X-GitHub-Event".to_string(), event.to_string()),
2588 ("X-GitHub-Delivery".to_string(), delivery.to_string()),
2589 ])
2590 }
2591
2592 fn unwrap_github(payload: ProviderPayload) -> GitHubEventPayload {
2593 match payload {
2594 ProviderPayload::Known(KnownProviderPayload::GitHub(p)) => p,
2595 other => panic!("expected GitHub payload, got {other:?}"),
2596 }
2597 }
2598
2599 fn connector_normalized(
2603 event: &str,
2604 delivery: &str,
2605 installation_id: i64,
2606 action: Option<&str>,
2607 original: serde_json::Value,
2608 promoted: serde_json::Value,
2609 ) -> serde_json::Value {
2610 let mut common = serde_json::json!({
2611 "provider": "github",
2612 "event": event,
2613 "topic": match action {
2614 Some(a) => format!("github.{event}.{a}"),
2615 None => format!("github.{event}"),
2616 },
2617 "delivery_id": delivery,
2618 "installation_id": installation_id,
2619 "repository": original.get("repository").cloned().unwrap_or(JsonValue::Null),
2620 "repo": serde_json::json!({"owner": "octo-org", "name": "octo-repo", "full_name": "octo-org/octo-repo"}),
2621 "raw": original,
2622 });
2623 if let Some(a) = action {
2624 common["action"] = serde_json::json!(a);
2625 }
2626 let common_obj = common.as_object_mut().unwrap();
2627 if let Some(promoted_obj) = promoted.as_object() {
2628 for (k, v) in promoted_obj {
2629 common_obj.insert(k.clone(), v.clone());
2630 }
2631 }
2632 common
2633 }
2634
2635 #[test]
2636 fn github_check_suite_event_promotes_typed_fields() {
2637 let original = serde_json::json!({
2638 "action": "requested",
2639 "check_suite": {
2640 "id": 8101,
2641 "status": "queued",
2642 "conclusion": null,
2643 "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2644 "head_branch": "feature/x",
2645 },
2646 "repository": {"full_name": "octo-org/octo-repo"},
2647 "installation": {"id": 3001},
2648 });
2649 let normalized = connector_normalized(
2650 "check_suite",
2651 "delivery-cs",
2652 3001,
2653 Some("requested"),
2654 original.clone(),
2655 serde_json::json!({
2656 "check_suite": original["check_suite"].clone(),
2657 "check_suite_id": 8101,
2658 "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2659 "head_ref": "feature/x",
2660 "status": "queued",
2661 }),
2662 );
2663 let provider = ProviderId::from("github");
2664 let payload = ProviderPayload::normalize(
2665 &provider,
2666 "check_suite",
2667 &github_headers("check_suite", "delivery-cs"),
2668 normalized,
2669 )
2670 .expect("check_suite payload");
2671 let GitHubEventPayload::CheckSuite(check_suite) = unwrap_github(payload) else {
2672 panic!("expected CheckSuite variant");
2673 };
2674 assert_eq!(check_suite.common.event, "check_suite");
2675 assert_eq!(check_suite.common.action.as_deref(), Some("requested"));
2676 assert_eq!(
2677 check_suite.common.delivery_id.as_deref(),
2678 Some("delivery-cs")
2679 );
2680 assert_eq!(check_suite.common.installation_id, Some(3001));
2681 assert_eq!(
2682 check_suite.common.topic.as_deref(),
2683 Some("github.check_suite.requested")
2684 );
2685 assert!(check_suite.common.repository.is_some());
2686 assert!(check_suite.common.repo.is_some());
2687 assert_eq!(check_suite.check_suite_id, Some(8101));
2688 assert_eq!(
2689 check_suite.head_sha.as_deref(),
2690 Some("ccccccccccccccccccccccccccccccccccccccc1")
2691 );
2692 assert_eq!(check_suite.head_ref.as_deref(), Some("feature/x"));
2693 assert_eq!(check_suite.status.as_deref(), Some("queued"));
2694 assert_eq!(check_suite.common.raw, original);
2696 }
2697
2698 #[test]
2699 fn github_status_event_promotes_typed_fields() {
2700 let original = serde_json::json!({
2701 "id": 9101,
2702 "sha": "ccccccccccccccccccccccccccccccccccccccc1",
2703 "state": "success",
2704 "context": "legacy/status",
2705 "target_url": "https://ci.example.test/octo-repo/9101",
2706 "branches": [{"name": "main"}],
2707 "repository": {"full_name": "octo-org/octo-repo"},
2708 "installation": {"id": 3001},
2709 });
2710 let normalized = connector_normalized(
2711 "status",
2712 "delivery-status",
2713 3001,
2714 None,
2715 original.clone(),
2716 serde_json::json!({
2717 "commit_status": original.clone(),
2718 "status_id": 9101,
2719 "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2720 "head_ref": "main",
2721 "base_ref": "main",
2722 "state": "success",
2723 "context": "legacy/status",
2724 "target_url": "https://ci.example.test/octo-repo/9101",
2725 }),
2726 );
2727 let provider = ProviderId::from("github");
2728 let payload = ProviderPayload::normalize(
2729 &provider,
2730 "status",
2731 &github_headers("status", "delivery-status"),
2732 normalized,
2733 )
2734 .expect("status payload");
2735 let GitHubEventPayload::Status(status) = unwrap_github(payload) else {
2736 panic!("expected Status variant");
2737 };
2738 assert_eq!(status.common.event, "status");
2739 assert_eq!(status.common.installation_id, Some(3001));
2740 assert_eq!(status.status_id, Some(9101));
2741 assert_eq!(status.state.as_deref(), Some("success"));
2742 assert_eq!(status.context.as_deref(), Some("legacy/status"));
2743 assert_eq!(
2744 status.target_url.as_deref(),
2745 Some("https://ci.example.test/octo-repo/9101")
2746 );
2747 assert_eq!(
2748 status.head_sha.as_deref(),
2749 Some("ccccccccccccccccccccccccccccccccccccccc1")
2750 );
2751 assert!(status.commit_status.is_some());
2752 }
2753
2754 #[test]
2755 fn github_merge_group_event_promotes_typed_fields() {
2756 let original = serde_json::json!({
2757 "action": "checks_requested",
2758 "merge_group": {
2759 "id": 9201,
2760 "head_ref": "gh-readonly-queue/main/pr-42",
2761 "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2762 "base_ref": "main",
2763 "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2764 "pull_requests": [{"number": 42}, {"number": 43}],
2765 },
2766 "repository": {"full_name": "octo-org/octo-repo"},
2767 "installation": {"id": 3001},
2768 });
2769 let normalized = connector_normalized(
2770 "merge_group",
2771 "delivery-mg",
2772 3001,
2773 Some("checks_requested"),
2774 original.clone(),
2775 serde_json::json!({
2776 "merge_group": original["merge_group"].clone(),
2777 "merge_group_id": 9201,
2778 "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2779 "head_ref": "gh-readonly-queue/main/pr-42",
2780 "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2781 "base_ref": "main",
2782 "pull_requests": [{"number": 42}, {"number": 43}],
2783 "pull_request_numbers": [42, 43],
2784 }),
2785 );
2786 let provider = ProviderId::from("github");
2787 let payload = ProviderPayload::normalize(
2788 &provider,
2789 "merge_group",
2790 &github_headers("merge_group", "delivery-mg"),
2791 normalized,
2792 )
2793 .expect("merge_group payload");
2794 let GitHubEventPayload::MergeGroup(mg) = unwrap_github(payload) else {
2795 panic!("expected MergeGroup variant");
2796 };
2797 assert_eq!(mg.common.event, "merge_group");
2798 assert_eq!(mg.common.action.as_deref(), Some("checks_requested"));
2799 assert_eq!(mg.merge_group_id, Some(serde_json::json!(9201)));
2800 assert_eq!(mg.head_ref.as_deref(), Some("gh-readonly-queue/main/pr-42"));
2801 assert_eq!(
2802 mg.base_sha.as_deref(),
2803 Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1")
2804 );
2805 assert_eq!(mg.base_ref.as_deref(), Some("main"));
2806 assert_eq!(mg.pull_request_numbers, vec![42i64, 43i64]);
2807 assert_eq!(mg.pull_requests.len(), 2);
2808 }
2809
2810 #[test]
2811 fn github_installation_event_promotes_typed_fields() {
2812 let original = serde_json::json!({
2813 "action": "suspend",
2814 "installation": {
2815 "id": 3001,
2816 "account": {"login": "octo-org"},
2817 "repository_selection": "selected",
2818 "suspended_at": "2026-04-20T18:00:00Z",
2819 },
2820 "repositories": [{"full_name": "octo-org/octo-repo"}],
2821 });
2822 let normalized = connector_normalized(
2823 "installation",
2824 "delivery-inst",
2825 3001,
2826 Some("suspend"),
2827 original.clone(),
2828 serde_json::json!({
2829 "installation": original["installation"].clone(),
2830 "account": {"login": "octo-org"},
2831 "installation_state": "suspended",
2832 "suspended": true,
2833 "revoked": false,
2834 "repositories": original["repositories"].clone(),
2835 }),
2836 );
2837 let provider = ProviderId::from("github");
2838 let payload = ProviderPayload::normalize(
2839 &provider,
2840 "installation",
2841 &github_headers("installation", "delivery-inst"),
2842 normalized,
2843 )
2844 .expect("installation payload");
2845 let GitHubEventPayload::Installation(inst) = unwrap_github(payload) else {
2846 panic!("expected Installation variant");
2847 };
2848 assert_eq!(inst.common.event, "installation");
2849 assert_eq!(inst.common.action.as_deref(), Some("suspend"));
2850 assert_eq!(inst.installation_state.as_deref(), Some("suspended"));
2851 assert_eq!(inst.suspended, Some(true));
2852 assert_eq!(inst.revoked, Some(false));
2853 assert_eq!(inst.repositories.len(), 1);
2854 assert!(inst.account.is_some());
2855 }
2856
2857 #[test]
2858 fn github_installation_repositories_event_promotes_typed_fields() {
2859 let original = serde_json::json!({
2860 "action": "removed",
2861 "installation": {"id": 3001, "account": {"login": "octo-org"}},
2862 "repository_selection": "selected",
2863 "repositories_added": [],
2864 "repositories_removed": [
2865 {"id": 4001, "full_name": "octo-org/octo-repo"},
2866 ],
2867 });
2868 let normalized = connector_normalized(
2869 "installation_repositories",
2870 "delivery-inst-repos",
2871 3001,
2872 Some("removed"),
2873 original.clone(),
2874 serde_json::json!({
2875 "installation": original["installation"].clone(),
2876 "account": {"login": "octo-org"},
2877 "installation_state": "revoked",
2878 "suspended": false,
2879 "revoked": true,
2880 "repository_selection": "selected",
2881 "repositories_added": [],
2882 "repositories_removed": original["repositories_removed"].clone(),
2883 }),
2884 );
2885 let provider = ProviderId::from("github");
2886 let payload = ProviderPayload::normalize(
2887 &provider,
2888 "installation_repositories",
2889 &github_headers("installation_repositories", "delivery-inst-repos"),
2890 normalized,
2891 )
2892 .expect("installation_repositories payload");
2893 let GitHubEventPayload::InstallationRepositories(repos) = unwrap_github(payload) else {
2894 panic!("expected InstallationRepositories variant");
2895 };
2896 assert_eq!(repos.common.event, "installation_repositories");
2897 assert_eq!(repos.common.action.as_deref(), Some("removed"));
2898 assert_eq!(repos.repository_selection.as_deref(), Some("selected"));
2899 assert!(repos.repositories_added.is_empty());
2900 assert_eq!(repos.repositories_removed.len(), 1);
2901 assert_eq!(
2902 repos.repositories_removed[0]
2903 .get("full_name")
2904 .and_then(|v| v.as_str()),
2905 Some("octo-org/octo-repo"),
2906 );
2907 assert_eq!(repos.installation_state.as_deref(), Some("revoked"));
2908 assert_eq!(repos.revoked, Some(true));
2909 }
2910
2911 #[test]
2912 fn github_legacy_direct_webhook_still_normalizes() {
2913 let provider = ProviderId::from("github");
2917 let payload = ProviderPayload::normalize(
2918 &provider,
2919 "issues",
2920 &github_headers("issues", "delivery-legacy"),
2921 serde_json::json!({
2922 "action": "opened",
2923 "installation": {"id": 99},
2924 "issue": {"number": 7},
2925 }),
2926 )
2927 .expect("legacy issues payload");
2928 let GitHubEventPayload::Issues(issues) = unwrap_github(payload) else {
2929 panic!("expected Issues variant");
2930 };
2931 assert_eq!(issues.common.installation_id, Some(99));
2932 assert_eq!(
2933 issues.common.delivery_id.as_deref(),
2934 Some("delivery-legacy")
2935 );
2936 assert!(issues.common.topic.is_none());
2937 assert!(issues.common.repo.is_none());
2938 assert_eq!(issues.issue.get("number").and_then(|v| v.as_i64()), Some(7));
2939 }
2940
2941 #[test]
2942 fn github_new_event_variants_round_trip_through_serde() {
2943 let provider = ProviderId::from("github");
2948 let cases: &[(&str, serde_json::Value, &str)] = &[
2949 (
2950 "check_suite",
2951 serde_json::json!({
2952 "event": "check_suite",
2953 "check_suite": {"id": 1},
2954 "check_suite_id": 1,
2955 "raw": {"check_suite": {"id": 1}},
2956 }),
2957 "CheckSuite",
2958 ),
2959 (
2960 "status",
2961 serde_json::json!({
2962 "event": "status",
2963 "commit_status": {"id": 9},
2964 "status_id": 9,
2965 "state": "success",
2966 "raw": {"id": 9, "state": "success"},
2967 }),
2968 "Status",
2969 ),
2970 (
2971 "merge_group",
2972 serde_json::json!({
2973 "event": "merge_group",
2974 "merge_group": {"id": 1},
2975 "merge_group_id": 1,
2976 "raw": {"merge_group": {"id": 1}},
2977 }),
2978 "MergeGroup",
2979 ),
2980 (
2981 "installation",
2982 serde_json::json!({
2983 "event": "installation",
2984 "installation": {"id": 1},
2985 "installation_state": "active",
2986 "suspended": false,
2987 "raw": {"installation": {"id": 1}},
2988 }),
2989 "Installation",
2990 ),
2991 (
2992 "installation_repositories",
2993 serde_json::json!({
2994 "event": "installation_repositories",
2995 "installation": {"id": 1},
2996 "repository_selection": "selected",
2997 "repositories_added": [],
2998 "repositories_removed": [{"id": 7}],
2999 "raw": {"installation": {"id": 1}},
3000 }),
3001 "InstallationRepositories",
3002 ),
3003 ];
3004 for (kind, raw, want_variant) in cases {
3005 let payload = ProviderPayload::normalize(
3006 &provider,
3007 kind,
3008 &github_headers(kind, "delivery"),
3009 raw.clone(),
3010 )
3011 .unwrap_or_else(|_| panic!("normalize {kind}"));
3012 let serialized = serde_json::to_value(&payload).expect("serialize");
3013 let deserialized: ProviderPayload =
3014 serde_json::from_value(serialized.clone()).expect("deserialize");
3015 let actual_variant = match unwrap_github(deserialized) {
3016 GitHubEventPayload::Issues(_) => "Issues",
3017 GitHubEventPayload::PullRequest(_) => "PullRequest",
3018 GitHubEventPayload::IssueComment(_) => "IssueComment",
3019 GitHubEventPayload::PullRequestReview(_) => "PullRequestReview",
3020 GitHubEventPayload::Push(_) => "Push",
3021 GitHubEventPayload::WorkflowRun(_) => "WorkflowRun",
3022 GitHubEventPayload::DeploymentStatus(_) => "DeploymentStatus",
3023 GitHubEventPayload::CheckRun(_) => "CheckRun",
3024 GitHubEventPayload::CheckSuite(_) => "CheckSuite",
3025 GitHubEventPayload::Status(_) => "Status",
3026 GitHubEventPayload::MergeGroup(_) => "MergeGroup",
3027 GitHubEventPayload::Installation(_) => "Installation",
3028 GitHubEventPayload::InstallationRepositories(_) => "InstallationRepositories",
3029 GitHubEventPayload::Other(_) => "Other",
3030 };
3031 assert_eq!(
3032 actual_variant, *want_variant,
3033 "{kind} round-tripped as {actual_variant}, expected {want_variant}; serialized form: {serialized}"
3034 );
3035 }
3036 }
3037
3038 #[test]
3039 fn provider_normalizes_stream_payloads() {
3040 let payload = ProviderPayload::normalize(
3041 &ProviderId::from("kafka"),
3042 "quote.tick",
3043 &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
3044 serde_json::json!({
3045 "topic": "quotes",
3046 "partition": 7,
3047 "offset": "42",
3048 "key": "AAPL",
3049 "timestamp": "2026-04-21T12:00:00Z"
3050 }),
3051 )
3052 .expect("stream payload");
3053 let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
3054 panic!("expected kafka stream payload")
3055 };
3056 assert_eq!(payload.event, "quote.tick");
3057 assert_eq!(payload.stream.as_deref(), Some("quotes"));
3058 assert_eq!(payload.partition.as_deref(), Some("7"));
3059 assert_eq!(payload.offset.as_deref(), Some("42"));
3060 assert_eq!(payload.key.as_deref(), Some("AAPL"));
3061 assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
3062 }
3063}