1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
14#[serde(transparent)]
15pub struct TriggerEventId(pub String);
16
17impl TriggerEventId {
18 pub fn new() -> Self {
19 Self(format!("trigger_evt_{}", Uuid::now_v7()))
20 }
21}
22
23impl Default for TriggerEventId {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ProviderId(pub String);
32
33impl ProviderId {
34 pub fn new(value: impl Into<String>) -> Self {
35 Self(value.into())
36 }
37
38 pub fn as_str(&self) -> &str {
39 self.0.as_str()
40 }
41}
42
43impl From<&str> for ProviderId {
44 fn from(value: &str) -> Self {
45 Self::new(value)
46 }
47}
48
49impl From<String> for ProviderId {
50 fn from(value: String) -> Self {
51 Self::new(value)
52 }
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56#[serde(transparent)]
57pub struct TraceId(pub String);
58
59impl TraceId {
60 pub fn new() -> Self {
61 Self(format!("trace_{}", Uuid::now_v7()))
62 }
63}
64
65impl Default for TraceId {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
72#[serde(transparent)]
73pub struct TenantId(pub String);
74
75impl TenantId {
76 pub fn new(value: impl Into<String>) -> Self {
77 Self(value.into())
78 }
79}
80
81#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(tag = "state", rename_all = "snake_case")]
83pub enum SignatureStatus {
84 Verified,
85 Unsigned,
86 Failed { reason: String },
87}
88
89#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
90pub struct GitHubEventCommon {
91 pub event: String,
92 pub action: Option<String>,
93 pub delivery_id: Option<String>,
94 pub installation_id: Option<i64>,
95 pub raw: JsonValue,
96}
97
98#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
99pub struct GitHubIssuesEventPayload {
100 #[serde(flatten)]
101 pub common: GitHubEventCommon,
102 pub issue: JsonValue,
103}
104
105#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
106pub struct GitHubPullRequestEventPayload {
107 #[serde(flatten)]
108 pub common: GitHubEventCommon,
109 pub pull_request: JsonValue,
110}
111
112#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
113pub struct GitHubIssueCommentEventPayload {
114 #[serde(flatten)]
115 pub common: GitHubEventCommon,
116 pub issue: JsonValue,
117 pub comment: JsonValue,
118}
119
120#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
121pub struct GitHubPullRequestReviewEventPayload {
122 #[serde(flatten)]
123 pub common: GitHubEventCommon,
124 pub pull_request: JsonValue,
125 pub review: JsonValue,
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct GitHubPushEventPayload {
130 #[serde(flatten)]
131 pub common: GitHubEventCommon,
132 #[serde(default)]
133 pub commits: Vec<JsonValue>,
134 pub distinct_size: Option<i64>,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubWorkflowRunEventPayload {
139 #[serde(flatten)]
140 pub common: GitHubEventCommon,
141 pub workflow_run: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145#[serde(untagged)]
146pub enum GitHubEventPayload {
147 Issues(GitHubIssuesEventPayload),
148 PullRequest(GitHubPullRequestEventPayload),
149 IssueComment(GitHubIssueCommentEventPayload),
150 PullRequestReview(GitHubPullRequestReviewEventPayload),
151 Push(GitHubPushEventPayload),
152 WorkflowRun(GitHubWorkflowRunEventPayload),
153 Other(GitHubEventCommon),
154}
155
156#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
157pub struct SlackEventCommon {
158 pub event: String,
159 pub event_id: Option<String>,
160 pub api_app_id: Option<String>,
161 pub team_id: Option<String>,
162 pub channel_id: Option<String>,
163 pub user_id: Option<String>,
164 pub event_ts: Option<String>,
165 pub raw: JsonValue,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169pub struct SlackMessageEventPayload {
170 #[serde(flatten)]
171 pub common: SlackEventCommon,
172 pub subtype: Option<String>,
173 pub channel_type: Option<String>,
174 pub channel: Option<String>,
175 pub user: Option<String>,
176 pub text: Option<String>,
177 pub ts: Option<String>,
178 pub thread_ts: Option<String>,
179}
180
181#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
182pub struct SlackAppMentionEventPayload {
183 #[serde(flatten)]
184 pub common: SlackEventCommon,
185 pub channel: Option<String>,
186 pub user: Option<String>,
187 pub text: Option<String>,
188 pub ts: Option<String>,
189 pub thread_ts: Option<String>,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193pub struct SlackReactionAddedEventPayload {
194 #[serde(flatten)]
195 pub common: SlackEventCommon,
196 pub reaction: Option<String>,
197 pub item_user: Option<String>,
198 pub item: JsonValue,
199}
200
201#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
202pub struct SlackAppHomeOpenedEventPayload {
203 #[serde(flatten)]
204 pub common: SlackEventCommon,
205 pub user: Option<String>,
206 pub channel: Option<String>,
207 pub tab: Option<String>,
208 pub view: JsonValue,
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
212pub struct SlackAssistantThreadStartedEventPayload {
213 #[serde(flatten)]
214 pub common: SlackEventCommon,
215 pub assistant_thread: JsonValue,
216 pub thread_ts: Option<String>,
217 pub context: JsonValue,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(untagged)]
222pub enum SlackEventPayload {
223 Message(SlackMessageEventPayload),
224 AppMention(SlackAppMentionEventPayload),
225 ReactionAdded(SlackReactionAddedEventPayload),
226 AppHomeOpened(SlackAppHomeOpenedEventPayload),
227 AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
228 Other(SlackEventCommon),
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct LinearEventCommon {
233 pub event: String,
234 pub action: Option<String>,
235 pub delivery_id: Option<String>,
236 pub organization_id: Option<String>,
237 pub webhook_timestamp: Option<i64>,
238 pub webhook_id: Option<String>,
239 pub url: Option<String>,
240 pub created_at: Option<String>,
241 pub actor: JsonValue,
242 pub raw: JsonValue,
243}
244
245#[derive(Clone, Debug, PartialEq)]
246pub enum LinearIssueChange {
247 Title { previous: Option<String> },
248 Description { previous: Option<String> },
249 Priority { previous: Option<i64> },
250 Estimate { previous: Option<i64> },
251 StateId { previous: Option<String> },
252 TeamId { previous: Option<String> },
253 AssigneeId { previous: Option<String> },
254 ProjectId { previous: Option<String> },
255 CycleId { previous: Option<String> },
256 DueDate { previous: Option<String> },
257 ParentId { previous: Option<String> },
258 SortOrder { previous: Option<f64> },
259 LabelIds { previous: Vec<String> },
260 CompletedAt { previous: Option<String> },
261 Other { field: String, previous: JsonValue },
262}
263
264impl Serialize for LinearIssueChange {
265 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
266 where
267 S: Serializer,
268 {
269 let value = match self {
270 Self::Title { previous } => {
271 serde_json::json!({ "field_name": "title", "previous": previous })
272 }
273 Self::Description { previous } => {
274 serde_json::json!({ "field_name": "description", "previous": previous })
275 }
276 Self::Priority { previous } => {
277 serde_json::json!({ "field_name": "priority", "previous": previous })
278 }
279 Self::Estimate { previous } => {
280 serde_json::json!({ "field_name": "estimate", "previous": previous })
281 }
282 Self::StateId { previous } => {
283 serde_json::json!({ "field_name": "state_id", "previous": previous })
284 }
285 Self::TeamId { previous } => {
286 serde_json::json!({ "field_name": "team_id", "previous": previous })
287 }
288 Self::AssigneeId { previous } => {
289 serde_json::json!({ "field_name": "assignee_id", "previous": previous })
290 }
291 Self::ProjectId { previous } => {
292 serde_json::json!({ "field_name": "project_id", "previous": previous })
293 }
294 Self::CycleId { previous } => {
295 serde_json::json!({ "field_name": "cycle_id", "previous": previous })
296 }
297 Self::DueDate { previous } => {
298 serde_json::json!({ "field_name": "due_date", "previous": previous })
299 }
300 Self::ParentId { previous } => {
301 serde_json::json!({ "field_name": "parent_id", "previous": previous })
302 }
303 Self::SortOrder { previous } => {
304 serde_json::json!({ "field_name": "sort_order", "previous": previous })
305 }
306 Self::LabelIds { previous } => {
307 serde_json::json!({ "field_name": "label_ids", "previous": previous })
308 }
309 Self::CompletedAt { previous } => {
310 serde_json::json!({ "field_name": "completed_at", "previous": previous })
311 }
312 Self::Other { field, previous } => {
313 serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
314 }
315 };
316 value.serialize(serializer)
317 }
318}
319
320impl<'de> Deserialize<'de> for LinearIssueChange {
321 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
322 where
323 D: Deserializer<'de>,
324 {
325 let value = JsonValue::deserialize(deserializer)?;
326 let field_name = value
327 .get("field_name")
328 .and_then(JsonValue::as_str)
329 .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
330 let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
331 Ok(match field_name {
332 "title" => Self::Title {
333 previous: previous.as_str().map(ToString::to_string),
334 },
335 "description" => Self::Description {
336 previous: previous.as_str().map(ToString::to_string),
337 },
338 "priority" => Self::Priority {
339 previous: parse_json_i64ish(&previous),
340 },
341 "estimate" => Self::Estimate {
342 previous: parse_json_i64ish(&previous),
343 },
344 "state_id" => Self::StateId {
345 previous: previous.as_str().map(ToString::to_string),
346 },
347 "team_id" => Self::TeamId {
348 previous: previous.as_str().map(ToString::to_string),
349 },
350 "assignee_id" => Self::AssigneeId {
351 previous: previous.as_str().map(ToString::to_string),
352 },
353 "project_id" => Self::ProjectId {
354 previous: previous.as_str().map(ToString::to_string),
355 },
356 "cycle_id" => Self::CycleId {
357 previous: previous.as_str().map(ToString::to_string),
358 },
359 "due_date" => Self::DueDate {
360 previous: previous.as_str().map(ToString::to_string),
361 },
362 "parent_id" => Self::ParentId {
363 previous: previous.as_str().map(ToString::to_string),
364 },
365 "sort_order" => Self::SortOrder {
366 previous: previous.as_f64(),
367 },
368 "label_ids" => Self::LabelIds {
369 previous: parse_string_array(&previous),
370 },
371 "completed_at" => Self::CompletedAt {
372 previous: previous.as_str().map(ToString::to_string),
373 },
374 "other" => Self::Other {
375 field: value
376 .get("field")
377 .and_then(JsonValue::as_str)
378 .map(ToString::to_string)
379 .unwrap_or_else(|| "unknown".to_string()),
380 previous,
381 },
382 other => Self::Other {
383 field: other.to_string(),
384 previous,
385 },
386 })
387 }
388}
389
390#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
391pub struct LinearIssueEventPayload {
392 #[serde(flatten)]
393 pub common: LinearEventCommon,
394 pub issue: JsonValue,
395 #[serde(default)]
396 pub changes: Vec<LinearIssueChange>,
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct LinearIssueCommentEventPayload {
401 #[serde(flatten)]
402 pub common: LinearEventCommon,
403 pub comment: JsonValue,
404}
405
406#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
407pub struct LinearIssueLabelEventPayload {
408 #[serde(flatten)]
409 pub common: LinearEventCommon,
410 pub label: JsonValue,
411}
412
413#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
414pub struct LinearProjectEventPayload {
415 #[serde(flatten)]
416 pub common: LinearEventCommon,
417 pub project: JsonValue,
418}
419
420#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
421pub struct LinearCycleEventPayload {
422 #[serde(flatten)]
423 pub common: LinearEventCommon,
424 pub cycle: JsonValue,
425}
426
427#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
428pub struct LinearCustomerEventPayload {
429 #[serde(flatten)]
430 pub common: LinearEventCommon,
431 pub customer: JsonValue,
432}
433
434#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
435pub struct LinearCustomerRequestEventPayload {
436 #[serde(flatten)]
437 pub common: LinearEventCommon,
438 pub customer_request: JsonValue,
439}
440
441#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
442#[serde(untagged)]
443pub enum LinearEventPayload {
444 Issue(LinearIssueEventPayload),
445 IssueComment(LinearIssueCommentEventPayload),
446 IssueLabel(LinearIssueLabelEventPayload),
447 Project(LinearProjectEventPayload),
448 Cycle(LinearCycleEventPayload),
449 Customer(LinearCustomerEventPayload),
450 CustomerRequest(LinearCustomerRequestEventPayload),
451 Other(LinearEventCommon),
452}
453
454#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
455pub struct NotionPolledChangeEvent {
456 pub resource: String,
457 pub source_id: String,
458 pub entity_id: String,
459 pub high_water_mark: String,
460 pub before: Option<JsonValue>,
461 pub after: JsonValue,
462}
463
464#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
465pub struct NotionEventPayload {
466 pub event: String,
467 pub workspace_id: Option<String>,
468 pub request_id: Option<String>,
469 pub subscription_id: Option<String>,
470 pub integration_id: Option<String>,
471 pub attempt_number: Option<u32>,
472 pub entity_id: Option<String>,
473 pub entity_type: Option<String>,
474 pub api_version: Option<String>,
475 pub verification_token: Option<String>,
476 pub polled: Option<NotionPolledChangeEvent>,
477 pub raw: JsonValue,
478}
479
480#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
481pub struct CronEventPayload {
482 pub cron_id: Option<String>,
483 pub schedule: Option<String>,
484 #[serde(with = "time::serde::rfc3339")]
485 pub tick_at: OffsetDateTime,
486 pub raw: JsonValue,
487}
488
489#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
490pub struct GenericWebhookPayload {
491 pub source: Option<String>,
492 pub content_type: Option<String>,
493 pub raw: JsonValue,
494}
495
496#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
497pub struct A2aPushPayload {
498 pub task_id: Option<String>,
499 pub sender: Option<String>,
500 pub raw: JsonValue,
501}
502
503#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
504pub struct ExtensionProviderPayload {
505 pub provider: String,
506 pub schema_name: String,
507 pub raw: JsonValue,
508}
509
510#[allow(clippy::large_enum_variant)]
511#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
512#[serde(untagged)]
513pub enum ProviderPayload {
514 Known(KnownProviderPayload),
515 Extension(ExtensionProviderPayload),
516}
517
518impl ProviderPayload {
519 pub fn provider(&self) -> &str {
520 match self {
521 Self::Known(known) => known.provider(),
522 Self::Extension(payload) => payload.provider.as_str(),
523 }
524 }
525
526 pub fn normalize(
527 provider: &ProviderId,
528 kind: &str,
529 headers: &BTreeMap<String, String>,
530 raw: JsonValue,
531 ) -> Result<Self, ProviderCatalogError> {
532 provider_catalog()
533 .read()
534 .expect("provider catalog poisoned")
535 .normalize(provider, kind, headers, raw)
536 }
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540#[serde(tag = "provider")]
541pub enum KnownProviderPayload {
542 #[serde(rename = "github")]
543 GitHub(GitHubEventPayload),
544 #[serde(rename = "slack")]
545 Slack(Box<SlackEventPayload>),
546 #[serde(rename = "linear")]
547 Linear(LinearEventPayload),
548 #[serde(rename = "notion")]
549 Notion(Box<NotionEventPayload>),
550 #[serde(rename = "cron")]
551 Cron(CronEventPayload),
552 #[serde(rename = "webhook")]
553 Webhook(GenericWebhookPayload),
554 #[serde(rename = "a2a-push")]
555 A2aPush(A2aPushPayload),
556}
557
558impl KnownProviderPayload {
559 pub fn provider(&self) -> &str {
560 match self {
561 Self::GitHub(_) => "github",
562 Self::Slack(_) => "slack",
563 Self::Linear(_) => "linear",
564 Self::Notion(_) => "notion",
565 Self::Cron(_) => "cron",
566 Self::Webhook(_) => "webhook",
567 Self::A2aPush(_) => "a2a-push",
568 }
569 }
570}
571
572#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
573pub struct TriggerEvent {
574 pub id: TriggerEventId,
575 pub provider: ProviderId,
576 pub kind: String,
577 #[serde(with = "time::serde::rfc3339")]
578 pub received_at: OffsetDateTime,
579 #[serde(with = "time::serde::rfc3339::option")]
580 pub occurred_at: Option<OffsetDateTime>,
581 pub dedupe_key: String,
582 pub trace_id: TraceId,
583 pub tenant_id: Option<TenantId>,
584 pub headers: BTreeMap<String, String>,
585 #[serde(default, skip_serializing_if = "Option::is_none")]
586 pub batch: Option<Vec<JsonValue>>,
587 pub provider_payload: ProviderPayload,
588 pub signature_status: SignatureStatus,
589 #[serde(skip)]
590 pub dedupe_claimed: bool,
591}
592
593impl TriggerEvent {
594 #[allow(clippy::too_many_arguments)]
595 pub fn new(
596 provider: ProviderId,
597 kind: impl Into<String>,
598 occurred_at: Option<OffsetDateTime>,
599 dedupe_key: impl Into<String>,
600 tenant_id: Option<TenantId>,
601 headers: BTreeMap<String, String>,
602 provider_payload: ProviderPayload,
603 signature_status: SignatureStatus,
604 ) -> Self {
605 Self {
606 id: TriggerEventId::new(),
607 provider,
608 kind: kind.into(),
609 received_at: clock::now_utc(),
610 occurred_at,
611 dedupe_key: dedupe_key.into(),
612 trace_id: TraceId::new(),
613 tenant_id,
614 headers,
615 batch: None,
616 provider_payload,
617 signature_status,
618 dedupe_claimed: false,
619 }
620 }
621
622 pub fn dedupe_claimed(&self) -> bool {
623 self.dedupe_claimed
624 }
625
626 pub fn mark_dedupe_claimed(&mut self) {
627 self.dedupe_claimed = true;
628 }
629}
630
631#[derive(Clone, Debug, PartialEq, Eq)]
632pub struct HeaderRedactionPolicy {
633 safe_exact_names: BTreeSet<String>,
634}
635
636impl HeaderRedactionPolicy {
637 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
638 self.safe_exact_names
639 .insert(name.into().to_ascii_lowercase());
640 self
641 }
642
643 fn should_keep(&self, name: &str) -> bool {
644 let lower = name.to_ascii_lowercase();
645 if self.safe_exact_names.contains(lower.as_str()) {
646 return true;
647 }
648 matches!(
649 lower.as_str(),
650 "user-agent"
651 | "request-id"
652 | "x-request-id"
653 | "x-correlation-id"
654 | "content-type"
655 | "content-length"
656 | "x-github-event"
657 | "x-github-delivery"
658 | "x-github-hook-id"
659 | "x-hub-signature-256"
660 | "x-slack-request-timestamp"
661 | "x-slack-signature"
662 | "x-linear-signature"
663 | "x-notion-signature"
664 | "x-a2a-signature"
665 | "x-a2a-delivery"
666 ) || lower.ends_with("-event")
667 || lower.ends_with("-delivery")
668 || lower.contains("timestamp")
669 || lower.contains("request-id")
670 }
671
672 fn should_redact(&self, name: &str) -> bool {
673 let lower = name.to_ascii_lowercase();
674 if self.should_keep(lower.as_str()) {
675 return false;
676 }
677 lower.contains("authorization")
678 || lower.contains("cookie")
679 || lower.contains("secret")
680 || lower.contains("token")
681 || lower.contains("key")
682 }
683}
684
685impl Default for HeaderRedactionPolicy {
686 fn default() -> Self {
687 Self {
688 safe_exact_names: BTreeSet::from([
689 "content-length".to_string(),
690 "content-type".to_string(),
691 "request-id".to_string(),
692 "user-agent".to_string(),
693 "x-a2a-delivery".to_string(),
694 "x-a2a-signature".to_string(),
695 "x-correlation-id".to_string(),
696 "x-github-delivery".to_string(),
697 "x-github-event".to_string(),
698 "x-github-hook-id".to_string(),
699 "x-hub-signature-256".to_string(),
700 "x-linear-signature".to_string(),
701 "x-notion-signature".to_string(),
702 "x-request-id".to_string(),
703 "x-slack-request-timestamp".to_string(),
704 "x-slack-signature".to_string(),
705 ]),
706 }
707 }
708}
709
710pub fn redact_headers(
711 headers: &BTreeMap<String, String>,
712 policy: &HeaderRedactionPolicy,
713) -> BTreeMap<String, String> {
714 headers
715 .iter()
716 .map(|(name, value)| {
717 if policy.should_redact(name) {
718 (name.clone(), REDACTED_HEADER_VALUE.to_string())
719 } else {
720 (name.clone(), value.clone())
721 }
722 })
723 .collect()
724}
725
726#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
727pub struct ProviderSecretRequirement {
728 pub name: String,
729 pub required: bool,
730 pub namespace: String,
731}
732
733#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
734pub struct ProviderOutboundMethod {
735 pub name: String,
736}
737
738#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
739#[serde(tag = "kind", rename_all = "snake_case")]
740pub enum SignatureVerificationMetadata {
741 #[default]
742 None,
743 Hmac {
744 variant: String,
745 raw_body: bool,
746 signature_header: String,
747 timestamp_header: Option<String>,
748 id_header: Option<String>,
749 default_tolerance_secs: Option<i64>,
750 digest: String,
751 encoding: String,
752 },
753}
754
755#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
756#[serde(tag = "kind", rename_all = "snake_case")]
757pub enum ProviderRuntimeMetadata {
758 Builtin {
759 connector: String,
760 default_signature_variant: Option<String>,
761 },
762 #[default]
763 Placeholder,
764}
765
766#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
767pub struct ProviderMetadata {
768 pub provider: String,
769 #[serde(default)]
770 pub kinds: Vec<String>,
771 pub schema_name: String,
772 #[serde(default)]
773 pub outbound_methods: Vec<ProviderOutboundMethod>,
774 #[serde(default)]
775 pub secret_requirements: Vec<ProviderSecretRequirement>,
776 #[serde(default)]
777 pub signature_verification: SignatureVerificationMetadata,
778 #[serde(default)]
779 pub runtime: ProviderRuntimeMetadata,
780}
781
782impl ProviderMetadata {
783 pub fn supports_kind(&self, kind: &str) -> bool {
784 self.kinds.iter().any(|candidate| candidate == kind)
785 }
786
787 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
788 self.secret_requirements
789 .iter()
790 .filter(|requirement| requirement.required)
791 .map(|requirement| requirement.name.as_str())
792 }
793}
794
795pub trait ProviderSchema: Send + Sync {
796 fn provider_id(&self) -> &'static str;
797 fn harn_schema_name(&self) -> &'static str;
798 fn metadata(&self) -> ProviderMetadata {
799 ProviderMetadata {
800 provider: self.provider_id().to_string(),
801 schema_name: self.harn_schema_name().to_string(),
802 ..ProviderMetadata::default()
803 }
804 }
805 fn normalize(
806 &self,
807 kind: &str,
808 headers: &BTreeMap<String, String>,
809 raw: JsonValue,
810 ) -> Result<ProviderPayload, ProviderCatalogError>;
811}
812
813#[derive(Clone, Debug, PartialEq, Eq)]
814pub enum ProviderCatalogError {
815 DuplicateProvider(String),
816 UnknownProvider(String),
817}
818
819impl std::fmt::Display for ProviderCatalogError {
820 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
821 match self {
822 Self::DuplicateProvider(provider) => {
823 write!(f, "provider `{provider}` is already registered")
824 }
825 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
826 }
827 }
828}
829
830impl std::error::Error for ProviderCatalogError {}
831
832#[derive(Clone, Default)]
833pub struct ProviderCatalog {
834 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
835}
836
837impl ProviderCatalog {
838 pub fn with_defaults() -> Self {
839 let mut catalog = Self::default();
840 for schema in default_provider_schemas() {
841 catalog
842 .register(schema)
843 .expect("default providers must register cleanly");
844 }
845 catalog
846 }
847
848 pub fn register(
849 &mut self,
850 schema: Arc<dyn ProviderSchema>,
851 ) -> Result<(), ProviderCatalogError> {
852 let provider = schema.provider_id().to_string();
853 if self.providers.contains_key(provider.as_str()) {
854 return Err(ProviderCatalogError::DuplicateProvider(provider));
855 }
856 self.providers.insert(provider, schema);
857 Ok(())
858 }
859
860 pub fn normalize(
861 &self,
862 provider: &ProviderId,
863 kind: &str,
864 headers: &BTreeMap<String, String>,
865 raw: JsonValue,
866 ) -> Result<ProviderPayload, ProviderCatalogError> {
867 let schema = self
868 .providers
869 .get(provider.as_str())
870 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
871 schema.normalize(kind, headers, raw)
872 }
873
874 pub fn schema_names(&self) -> BTreeMap<String, String> {
875 self.providers
876 .iter()
877 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
878 .collect()
879 }
880
881 pub fn entries(&self) -> Vec<ProviderMetadata> {
882 self.providers
883 .values()
884 .map(|schema| schema.metadata())
885 .collect()
886 }
887
888 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
889 self.providers.get(provider).map(|schema| schema.metadata())
890 }
891}
892
893pub fn register_provider_schema(
894 schema: Arc<dyn ProviderSchema>,
895) -> Result<(), ProviderCatalogError> {
896 provider_catalog()
897 .write()
898 .expect("provider catalog poisoned")
899 .register(schema)
900}
901
902pub fn reset_provider_catalog() {
903 *provider_catalog()
904 .write()
905 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
906}
907
908pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
909 provider_catalog()
910 .read()
911 .expect("provider catalog poisoned")
912 .schema_names()
913}
914
915pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
916 provider_catalog()
917 .read()
918 .expect("provider catalog poisoned")
919 .entries()
920}
921
922pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
923 provider_catalog()
924 .read()
925 .expect("provider catalog poisoned")
926 .metadata_for(provider)
927}
928
929fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
930 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
931 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
932}
933
934struct BuiltinProviderSchema {
935 provider_id: &'static str,
936 harn_schema_name: &'static str,
937 metadata: ProviderMetadata,
938 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
939}
940
941impl ProviderSchema for BuiltinProviderSchema {
942 fn provider_id(&self) -> &'static str {
943 self.provider_id
944 }
945
946 fn harn_schema_name(&self) -> &'static str {
947 self.harn_schema_name
948 }
949
950 fn metadata(&self) -> ProviderMetadata {
951 self.metadata.clone()
952 }
953
954 fn normalize(
955 &self,
956 kind: &str,
957 headers: &BTreeMap<String, String>,
958 raw: JsonValue,
959 ) -> Result<ProviderPayload, ProviderCatalogError> {
960 Ok((self.normalize)(kind, headers, raw))
961 }
962}
963
964fn provider_metadata_entry(
965 provider: &str,
966 kinds: &[&str],
967 schema_name: &str,
968 outbound_methods: &[&str],
969 signature_verification: SignatureVerificationMetadata,
970 secret_requirements: Vec<ProviderSecretRequirement>,
971 runtime: ProviderRuntimeMetadata,
972) -> ProviderMetadata {
973 ProviderMetadata {
974 provider: provider.to_string(),
975 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
976 schema_name: schema_name.to_string(),
977 outbound_methods: outbound_methods
978 .iter()
979 .map(|name| ProviderOutboundMethod {
980 name: (*name).to_string(),
981 })
982 .collect(),
983 secret_requirements,
984 signature_verification,
985 runtime,
986 }
987}
988
989fn hmac_signature_metadata(
990 variant: &str,
991 signature_header: &str,
992 timestamp_header: Option<&str>,
993 id_header: Option<&str>,
994 default_tolerance_secs: Option<i64>,
995 encoding: &str,
996) -> SignatureVerificationMetadata {
997 SignatureVerificationMetadata::Hmac {
998 variant: variant.to_string(),
999 raw_body: true,
1000 signature_header: signature_header.to_string(),
1001 timestamp_header: timestamp_header.map(ToString::to_string),
1002 id_header: id_header.map(ToString::to_string),
1003 default_tolerance_secs,
1004 digest: "sha256".to_string(),
1005 encoding: encoding.to_string(),
1006 }
1007}
1008
1009fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1010 ProviderSecretRequirement {
1011 name: name.to_string(),
1012 required: true,
1013 namespace: namespace.to_string(),
1014 }
1015}
1016
1017fn outbound_method(name: &str) -> ProviderOutboundMethod {
1018 ProviderOutboundMethod {
1019 name: name.to_string(),
1020 }
1021}
1022
1023fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1024 ProviderSecretRequirement {
1025 name: name.to_string(),
1026 required: false,
1027 namespace: namespace.to_string(),
1028 }
1029}
1030
1031fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1032 vec![
1033 Arc::new(BuiltinProviderSchema {
1034 provider_id: "github",
1035 harn_schema_name: "GitHubEventPayload",
1036 metadata: provider_metadata_entry(
1037 "github",
1038 &["webhook"],
1039 "GitHubEventPayload",
1040 &[],
1041 hmac_signature_metadata(
1042 "github",
1043 "X-Hub-Signature-256",
1044 None,
1045 Some("X-GitHub-Delivery"),
1046 None,
1047 "hex",
1048 ),
1049 vec![required_secret("signing_secret", "github")],
1050 ProviderRuntimeMetadata::Builtin {
1051 connector: "webhook".to_string(),
1052 default_signature_variant: Some("github".to_string()),
1053 },
1054 ),
1055 normalize: github_payload,
1056 }),
1057 Arc::new(BuiltinProviderSchema {
1058 provider_id: "slack",
1059 harn_schema_name: "SlackEventPayload",
1060 metadata: provider_metadata_entry(
1061 "slack",
1062 &["webhook"],
1063 "SlackEventPayload",
1064 &[
1065 "post_message",
1066 "update_message",
1067 "add_reaction",
1068 "open_view",
1069 "user_info",
1070 "api_call",
1071 "upload_file",
1072 ],
1073 hmac_signature_metadata(
1074 "slack",
1075 "X-Slack-Signature",
1076 Some("X-Slack-Request-Timestamp"),
1077 None,
1078 Some(300),
1079 "hex",
1080 ),
1081 vec![required_secret("signing_secret", "slack")],
1082 ProviderRuntimeMetadata::Builtin {
1083 connector: "slack".to_string(),
1084 default_signature_variant: Some("slack".to_string()),
1085 },
1086 ),
1087 normalize: slack_payload,
1088 }),
1089 Arc::new(BuiltinProviderSchema {
1090 provider_id: "linear",
1091 harn_schema_name: "LinearEventPayload",
1092 metadata: {
1093 let mut metadata = provider_metadata_entry(
1094 "linear",
1095 &["webhook"],
1096 "LinearEventPayload",
1097 &[],
1098 hmac_signature_metadata(
1099 "linear",
1100 "Linear-Signature",
1101 None,
1102 Some("Linear-Delivery"),
1103 Some(75),
1104 "hex",
1105 ),
1106 vec![
1107 required_secret("signing_secret", "linear"),
1108 optional_secret("access_token", "linear"),
1109 ],
1110 ProviderRuntimeMetadata::Builtin {
1111 connector: "linear".to_string(),
1112 default_signature_variant: Some("linear".to_string()),
1113 },
1114 );
1115 metadata.outbound_methods = vec![
1116 ProviderOutboundMethod {
1117 name: "list_issues".to_string(),
1118 },
1119 ProviderOutboundMethod {
1120 name: "update_issue".to_string(),
1121 },
1122 ProviderOutboundMethod {
1123 name: "create_comment".to_string(),
1124 },
1125 ProviderOutboundMethod {
1126 name: "search".to_string(),
1127 },
1128 ProviderOutboundMethod {
1129 name: "graphql".to_string(),
1130 },
1131 ];
1132 metadata
1133 },
1134 normalize: linear_payload,
1135 }),
1136 Arc::new(BuiltinProviderSchema {
1137 provider_id: "notion",
1138 harn_schema_name: "NotionEventPayload",
1139 metadata: {
1140 let mut metadata = provider_metadata_entry(
1141 "notion",
1142 &["webhook", "poll"],
1143 "NotionEventPayload",
1144 &[],
1145 hmac_signature_metadata(
1146 "notion",
1147 "X-Notion-Signature",
1148 None,
1149 None,
1150 None,
1151 "hex",
1152 ),
1153 vec![required_secret("verification_token", "notion")],
1154 ProviderRuntimeMetadata::Builtin {
1155 connector: "notion".to_string(),
1156 default_signature_variant: Some("notion".to_string()),
1157 },
1158 );
1159 metadata.outbound_methods = vec![
1160 outbound_method("get_page"),
1161 outbound_method("update_page"),
1162 outbound_method("append_blocks"),
1163 outbound_method("query_database"),
1164 outbound_method("search"),
1165 outbound_method("create_comment"),
1166 outbound_method("api_call"),
1167 ];
1168 metadata
1169 },
1170 normalize: notion_payload,
1171 }),
1172 Arc::new(BuiltinProviderSchema {
1173 provider_id: "cron",
1174 harn_schema_name: "CronEventPayload",
1175 metadata: provider_metadata_entry(
1176 "cron",
1177 &["cron"],
1178 "CronEventPayload",
1179 &[],
1180 SignatureVerificationMetadata::None,
1181 Vec::new(),
1182 ProviderRuntimeMetadata::Builtin {
1183 connector: "cron".to_string(),
1184 default_signature_variant: None,
1185 },
1186 ),
1187 normalize: cron_payload,
1188 }),
1189 Arc::new(BuiltinProviderSchema {
1190 provider_id: "webhook",
1191 harn_schema_name: "GenericWebhookPayload",
1192 metadata: provider_metadata_entry(
1193 "webhook",
1194 &["webhook"],
1195 "GenericWebhookPayload",
1196 &[],
1197 hmac_signature_metadata(
1198 "standard",
1199 "webhook-signature",
1200 Some("webhook-timestamp"),
1201 Some("webhook-id"),
1202 Some(300),
1203 "base64",
1204 ),
1205 vec![required_secret("signing_secret", "webhook")],
1206 ProviderRuntimeMetadata::Builtin {
1207 connector: "webhook".to_string(),
1208 default_signature_variant: Some("standard".to_string()),
1209 },
1210 ),
1211 normalize: webhook_payload,
1212 }),
1213 Arc::new(BuiltinProviderSchema {
1214 provider_id: "a2a-push",
1215 harn_schema_name: "A2aPushPayload",
1216 metadata: provider_metadata_entry(
1217 "a2a-push",
1218 &["a2a-push"],
1219 "A2aPushPayload",
1220 &[],
1221 SignatureVerificationMetadata::None,
1222 Vec::new(),
1223 ProviderRuntimeMetadata::Placeholder,
1224 ),
1225 normalize: a2a_push_payload,
1226 }),
1227 ]
1228}
1229
1230fn github_payload(
1231 kind: &str,
1232 headers: &BTreeMap<String, String>,
1233 raw: JsonValue,
1234) -> ProviderPayload {
1235 let common = GitHubEventCommon {
1236 event: kind.to_string(),
1237 action: raw
1238 .get("action")
1239 .and_then(JsonValue::as_str)
1240 .map(ToString::to_string),
1241 delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1242 installation_id: raw
1243 .get("installation")
1244 .and_then(|value| value.get("id"))
1245 .and_then(JsonValue::as_i64),
1246 raw: raw.clone(),
1247 };
1248 let payload = match kind {
1249 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1250 common,
1251 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1252 }),
1253 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1254 common,
1255 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1256 }),
1257 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1258 common,
1259 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1260 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1261 }),
1262 "pull_request_review" => {
1263 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1264 common,
1265 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1266 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1267 })
1268 }
1269 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1270 common,
1271 commits: raw
1272 .get("commits")
1273 .and_then(JsonValue::as_array)
1274 .cloned()
1275 .unwrap_or_default(),
1276 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1277 }),
1278 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1279 common,
1280 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1281 }),
1282 _ => GitHubEventPayload::Other(common),
1283 };
1284 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1285}
1286
1287fn slack_payload(
1288 kind: &str,
1289 _headers: &BTreeMap<String, String>,
1290 raw: JsonValue,
1291) -> ProviderPayload {
1292 let event = raw.get("event");
1293 let common = SlackEventCommon {
1294 event: kind.to_string(),
1295 event_id: raw
1296 .get("event_id")
1297 .and_then(JsonValue::as_str)
1298 .map(ToString::to_string),
1299 api_app_id: raw
1300 .get("api_app_id")
1301 .and_then(JsonValue::as_str)
1302 .map(ToString::to_string),
1303 team_id: raw
1304 .get("team_id")
1305 .and_then(JsonValue::as_str)
1306 .map(ToString::to_string),
1307 channel_id: slack_channel_id(event),
1308 user_id: slack_user_id(event),
1309 event_ts: event
1310 .and_then(|value| value.get("event_ts"))
1311 .and_then(JsonValue::as_str)
1312 .map(ToString::to_string),
1313 raw: raw.clone(),
1314 };
1315 let payload = match kind {
1316 kind if kind == "message" || kind.starts_with("message.") => {
1317 SlackEventPayload::Message(SlackMessageEventPayload {
1318 subtype: event
1319 .and_then(|value| value.get("subtype"))
1320 .and_then(JsonValue::as_str)
1321 .map(ToString::to_string),
1322 channel_type: event
1323 .and_then(|value| value.get("channel_type"))
1324 .and_then(JsonValue::as_str)
1325 .map(ToString::to_string),
1326 channel: event
1327 .and_then(|value| value.get("channel"))
1328 .and_then(JsonValue::as_str)
1329 .map(ToString::to_string),
1330 user: event
1331 .and_then(|value| value.get("user"))
1332 .and_then(JsonValue::as_str)
1333 .map(ToString::to_string),
1334 text: event
1335 .and_then(|value| value.get("text"))
1336 .and_then(JsonValue::as_str)
1337 .map(ToString::to_string),
1338 ts: event
1339 .and_then(|value| value.get("ts"))
1340 .and_then(JsonValue::as_str)
1341 .map(ToString::to_string),
1342 thread_ts: event
1343 .and_then(|value| value.get("thread_ts"))
1344 .and_then(JsonValue::as_str)
1345 .map(ToString::to_string),
1346 common,
1347 })
1348 }
1349 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1350 channel: event
1351 .and_then(|value| value.get("channel"))
1352 .and_then(JsonValue::as_str)
1353 .map(ToString::to_string),
1354 user: event
1355 .and_then(|value| value.get("user"))
1356 .and_then(JsonValue::as_str)
1357 .map(ToString::to_string),
1358 text: event
1359 .and_then(|value| value.get("text"))
1360 .and_then(JsonValue::as_str)
1361 .map(ToString::to_string),
1362 ts: event
1363 .and_then(|value| value.get("ts"))
1364 .and_then(JsonValue::as_str)
1365 .map(ToString::to_string),
1366 thread_ts: event
1367 .and_then(|value| value.get("thread_ts"))
1368 .and_then(JsonValue::as_str)
1369 .map(ToString::to_string),
1370 common,
1371 }),
1372 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1373 reaction: event
1374 .and_then(|value| value.get("reaction"))
1375 .and_then(JsonValue::as_str)
1376 .map(ToString::to_string),
1377 item_user: event
1378 .and_then(|value| value.get("item_user"))
1379 .and_then(JsonValue::as_str)
1380 .map(ToString::to_string),
1381 item: event
1382 .and_then(|value| value.get("item"))
1383 .cloned()
1384 .unwrap_or(JsonValue::Null),
1385 common,
1386 }),
1387 "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1388 user: event
1389 .and_then(|value| value.get("user"))
1390 .and_then(JsonValue::as_str)
1391 .map(ToString::to_string),
1392 channel: event
1393 .and_then(|value| value.get("channel"))
1394 .and_then(JsonValue::as_str)
1395 .map(ToString::to_string),
1396 tab: event
1397 .and_then(|value| value.get("tab"))
1398 .and_then(JsonValue::as_str)
1399 .map(ToString::to_string),
1400 view: event
1401 .and_then(|value| value.get("view"))
1402 .cloned()
1403 .unwrap_or(JsonValue::Null),
1404 common,
1405 }),
1406 "assistant_thread_started" => {
1407 let assistant_thread = event
1408 .and_then(|value| value.get("assistant_thread"))
1409 .cloned()
1410 .unwrap_or(JsonValue::Null);
1411 SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1412 thread_ts: assistant_thread
1413 .get("thread_ts")
1414 .and_then(JsonValue::as_str)
1415 .map(ToString::to_string),
1416 context: assistant_thread
1417 .get("context")
1418 .cloned()
1419 .unwrap_or(JsonValue::Null),
1420 assistant_thread,
1421 common,
1422 })
1423 }
1424 _ => SlackEventPayload::Other(common),
1425 };
1426 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1427}
1428
1429fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1430 event
1431 .and_then(|value| value.get("channel"))
1432 .and_then(JsonValue::as_str)
1433 .map(ToString::to_string)
1434 .or_else(|| {
1435 event
1436 .and_then(|value| value.get("item"))
1437 .and_then(|value| value.get("channel"))
1438 .and_then(JsonValue::as_str)
1439 .map(ToString::to_string)
1440 })
1441 .or_else(|| {
1442 event
1443 .and_then(|value| value.get("channel"))
1444 .and_then(|value| value.get("id"))
1445 .and_then(JsonValue::as_str)
1446 .map(ToString::to_string)
1447 })
1448 .or_else(|| {
1449 event
1450 .and_then(|value| value.get("assistant_thread"))
1451 .and_then(|value| value.get("channel_id"))
1452 .and_then(JsonValue::as_str)
1453 .map(ToString::to_string)
1454 })
1455}
1456
1457fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1458 event
1459 .and_then(|value| value.get("user"))
1460 .and_then(JsonValue::as_str)
1461 .map(ToString::to_string)
1462 .or_else(|| {
1463 event
1464 .and_then(|value| value.get("user"))
1465 .and_then(|value| value.get("id"))
1466 .and_then(JsonValue::as_str)
1467 .map(ToString::to_string)
1468 })
1469 .or_else(|| {
1470 event
1471 .and_then(|value| value.get("item_user"))
1472 .and_then(JsonValue::as_str)
1473 .map(ToString::to_string)
1474 })
1475 .or_else(|| {
1476 event
1477 .and_then(|value| value.get("assistant_thread"))
1478 .and_then(|value| value.get("user_id"))
1479 .and_then(JsonValue::as_str)
1480 .map(ToString::to_string)
1481 })
1482}
1483
1484fn linear_payload(
1485 _kind: &str,
1486 headers: &BTreeMap<String, String>,
1487 raw: JsonValue,
1488) -> ProviderPayload {
1489 let common = linear_event_common(headers, &raw);
1490 let event = common.event.clone();
1491 let payload = match event.as_str() {
1492 "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1493 common,
1494 issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1495 changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1496 }),
1497 "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1498 common,
1499 comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1500 }),
1501 "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1502 common,
1503 label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1504 }),
1505 "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1506 common,
1507 project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1508 }),
1509 "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1510 common,
1511 cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1512 }),
1513 "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1514 common,
1515 customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1516 }),
1517 "customer_request" => {
1518 LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1519 common,
1520 customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1521 })
1522 }
1523 _ => LinearEventPayload::Other(common),
1524 };
1525 ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1526}
1527
1528fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1529 LinearEventCommon {
1530 event: linear_event_name(
1531 raw.get("type")
1532 .and_then(JsonValue::as_str)
1533 .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1534 ),
1535 action: raw
1536 .get("action")
1537 .and_then(JsonValue::as_str)
1538 .map(ToString::to_string),
1539 delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1540 organization_id: raw
1541 .get("organizationId")
1542 .and_then(JsonValue::as_str)
1543 .map(ToString::to_string),
1544 webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1545 webhook_id: raw
1546 .get("webhookId")
1547 .and_then(JsonValue::as_str)
1548 .map(ToString::to_string),
1549 url: raw
1550 .get("url")
1551 .and_then(JsonValue::as_str)
1552 .map(ToString::to_string),
1553 created_at: raw
1554 .get("createdAt")
1555 .and_then(JsonValue::as_str)
1556 .map(ToString::to_string),
1557 actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1558 raw: raw.clone(),
1559 }
1560}
1561
1562fn linear_event_name(raw_type: Option<&str>) -> String {
1563 match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1564 "issue" => "issue".to_string(),
1565 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1566 "issuelabel" | "issue_label" => "issue_label".to_string(),
1567 "project" | "projectupdate" | "project_update" => "project".to_string(),
1568 "cycle" => "cycle".to_string(),
1569 "customer" => "customer".to_string(),
1570 "customerrequest" | "customer_request" => "customer_request".to_string(),
1571 other if !other.is_empty() => other.to_string(),
1572 _ => "other".to_string(),
1573 }
1574}
1575
1576fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1577 let Some(JsonValue::Object(fields)) = updated_from else {
1578 return Vec::new();
1579 };
1580 let mut changes = Vec::new();
1581 for (field, previous) in fields {
1582 let change = match field.as_str() {
1583 "title" => LinearIssueChange::Title {
1584 previous: previous.as_str().map(ToString::to_string),
1585 },
1586 "description" => LinearIssueChange::Description {
1587 previous: previous.as_str().map(ToString::to_string),
1588 },
1589 "priority" => LinearIssueChange::Priority {
1590 previous: parse_json_i64ish(previous),
1591 },
1592 "estimate" => LinearIssueChange::Estimate {
1593 previous: parse_json_i64ish(previous),
1594 },
1595 "stateId" => LinearIssueChange::StateId {
1596 previous: previous.as_str().map(ToString::to_string),
1597 },
1598 "teamId" => LinearIssueChange::TeamId {
1599 previous: previous.as_str().map(ToString::to_string),
1600 },
1601 "assigneeId" => LinearIssueChange::AssigneeId {
1602 previous: previous.as_str().map(ToString::to_string),
1603 },
1604 "projectId" => LinearIssueChange::ProjectId {
1605 previous: previous.as_str().map(ToString::to_string),
1606 },
1607 "cycleId" => LinearIssueChange::CycleId {
1608 previous: previous.as_str().map(ToString::to_string),
1609 },
1610 "dueDate" => LinearIssueChange::DueDate {
1611 previous: previous.as_str().map(ToString::to_string),
1612 },
1613 "parentId" => LinearIssueChange::ParentId {
1614 previous: previous.as_str().map(ToString::to_string),
1615 },
1616 "sortOrder" => LinearIssueChange::SortOrder {
1617 previous: previous.as_f64(),
1618 },
1619 "labelIds" => LinearIssueChange::LabelIds {
1620 previous: parse_string_array(previous),
1621 },
1622 "completedAt" => LinearIssueChange::CompletedAt {
1623 previous: previous.as_str().map(ToString::to_string),
1624 },
1625 _ => LinearIssueChange::Other {
1626 field: field.clone(),
1627 previous: previous.clone(),
1628 },
1629 };
1630 changes.push(change);
1631 }
1632 changes
1633}
1634
1635fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1636 value
1637 .as_i64()
1638 .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1639 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1640}
1641
1642fn parse_string_array(value: &JsonValue) -> Vec<String> {
1643 let Some(array) = value.as_array() else {
1644 return Vec::new();
1645 };
1646 array
1647 .iter()
1648 .filter_map(|entry| {
1649 entry.as_str().map(ToString::to_string).or_else(|| {
1650 entry
1651 .get("id")
1652 .and_then(JsonValue::as_str)
1653 .map(ToString::to_string)
1654 })
1655 })
1656 .collect()
1657}
1658
1659fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1660 headers
1661 .iter()
1662 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1663 .map(|(_, value)| value.as_str())
1664}
1665
1666fn notion_payload(
1667 kind: &str,
1668 headers: &BTreeMap<String, String>,
1669 raw: JsonValue,
1670) -> ProviderPayload {
1671 let workspace_id = raw
1672 .get("workspace_id")
1673 .and_then(JsonValue::as_str)
1674 .map(ToString::to_string);
1675 ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1676 event: kind.to_string(),
1677 workspace_id,
1678 request_id: headers
1679 .get("request-id")
1680 .cloned()
1681 .or_else(|| headers.get("x-request-id").cloned()),
1682 subscription_id: raw
1683 .get("subscription_id")
1684 .and_then(JsonValue::as_str)
1685 .map(ToString::to_string),
1686 integration_id: raw
1687 .get("integration_id")
1688 .and_then(JsonValue::as_str)
1689 .map(ToString::to_string),
1690 attempt_number: raw
1691 .get("attempt_number")
1692 .and_then(JsonValue::as_u64)
1693 .and_then(|value| u32::try_from(value).ok()),
1694 entity_id: raw
1695 .get("entity")
1696 .and_then(|value| value.get("id"))
1697 .and_then(JsonValue::as_str)
1698 .map(ToString::to_string),
1699 entity_type: raw
1700 .get("entity")
1701 .and_then(|value| value.get("type"))
1702 .and_then(JsonValue::as_str)
1703 .map(ToString::to_string),
1704 api_version: raw
1705 .get("api_version")
1706 .and_then(JsonValue::as_str)
1707 .map(ToString::to_string),
1708 verification_token: raw
1709 .get("verification_token")
1710 .and_then(JsonValue::as_str)
1711 .map(ToString::to_string),
1712 polled: None,
1713 raw,
1714 })))
1715}
1716
1717fn cron_payload(
1718 _kind: &str,
1719 _headers: &BTreeMap<String, String>,
1720 raw: JsonValue,
1721) -> ProviderPayload {
1722 let cron_id = raw
1723 .get("cron_id")
1724 .and_then(JsonValue::as_str)
1725 .map(ToString::to_string);
1726 let schedule = raw
1727 .get("schedule")
1728 .and_then(JsonValue::as_str)
1729 .map(ToString::to_string);
1730 let tick_at = raw
1731 .get("tick_at")
1732 .and_then(JsonValue::as_str)
1733 .and_then(parse_rfc3339)
1734 .unwrap_or_else(OffsetDateTime::now_utc);
1735 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1736 cron_id,
1737 schedule,
1738 tick_at,
1739 raw,
1740 }))
1741}
1742
1743fn webhook_payload(
1744 _kind: &str,
1745 headers: &BTreeMap<String, String>,
1746 raw: JsonValue,
1747) -> ProviderPayload {
1748 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1749 source: headers.get("X-Webhook-Source").cloned(),
1750 content_type: headers.get("Content-Type").cloned(),
1751 raw,
1752 }))
1753}
1754
1755fn a2a_push_payload(
1756 _kind: &str,
1757 _headers: &BTreeMap<String, String>,
1758 raw: JsonValue,
1759) -> ProviderPayload {
1760 let task_id = raw
1761 .get("task_id")
1762 .and_then(JsonValue::as_str)
1763 .map(ToString::to_string);
1764 let sender = raw
1765 .get("sender")
1766 .and_then(JsonValue::as_str)
1767 .map(ToString::to_string);
1768 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1769 task_id,
1770 sender,
1771 raw,
1772 }))
1773}
1774
1775fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1776 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1777}
1778
1779#[cfg(test)]
1780mod tests {
1781 use super::*;
1782
1783 fn sample_headers() -> BTreeMap<String, String> {
1784 BTreeMap::from([
1785 ("Authorization".to_string(), "Bearer secret".to_string()),
1786 ("Cookie".to_string(), "session=abc".to_string()),
1787 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
1788 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1789 ("X-GitHub-Event".to_string(), "issues".to_string()),
1790 ("X-Webhook-Token".to_string(), "token".to_string()),
1791 ])
1792 }
1793
1794 #[test]
1795 fn default_redaction_policy_keeps_safe_headers() {
1796 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1797 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
1798 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
1799 assert_eq!(
1800 redacted.get("Authorization").unwrap(),
1801 REDACTED_HEADER_VALUE
1802 );
1803 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
1804 assert_eq!(
1805 redacted.get("X-Webhook-Token").unwrap(),
1806 REDACTED_HEADER_VALUE
1807 );
1808 }
1809
1810 #[test]
1811 fn provider_catalog_rejects_duplicates() {
1812 let mut catalog = ProviderCatalog::default();
1813 catalog
1814 .register(Arc::new(BuiltinProviderSchema {
1815 provider_id: "github",
1816 harn_schema_name: "GitHubEventPayload",
1817 metadata: provider_metadata_entry(
1818 "github",
1819 &["webhook"],
1820 "GitHubEventPayload",
1821 &[],
1822 SignatureVerificationMetadata::None,
1823 Vec::new(),
1824 ProviderRuntimeMetadata::Placeholder,
1825 ),
1826 normalize: github_payload,
1827 }))
1828 .unwrap();
1829 let error = catalog
1830 .register(Arc::new(BuiltinProviderSchema {
1831 provider_id: "github",
1832 harn_schema_name: "GitHubEventPayload",
1833 metadata: provider_metadata_entry(
1834 "github",
1835 &["webhook"],
1836 "GitHubEventPayload",
1837 &[],
1838 SignatureVerificationMetadata::None,
1839 Vec::new(),
1840 ProviderRuntimeMetadata::Placeholder,
1841 ),
1842 normalize: github_payload,
1843 }))
1844 .unwrap_err();
1845 assert_eq!(
1846 error,
1847 ProviderCatalogError::DuplicateProvider("github".to_string())
1848 );
1849 }
1850
1851 #[test]
1852 fn registered_provider_metadata_marks_builtin_connectors() {
1853 let entries = registered_provider_metadata();
1854 let builtin: Vec<&ProviderMetadata> = entries
1855 .iter()
1856 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
1857 .collect();
1858
1859 assert_eq!(builtin.len(), 6);
1860 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
1861 assert!(builtin.iter().any(|entry| entry.provider == "github"));
1862 assert!(builtin.iter().any(|entry| entry.provider == "linear"));
1863 assert!(builtin.iter().any(|entry| entry.provider == "notion"));
1864 assert!(builtin.iter().any(|entry| entry.provider == "slack"));
1865 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
1866 }
1867
1868 #[test]
1869 fn trigger_event_round_trip_is_stable() {
1870 let provider = ProviderId::from("github");
1871 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1872 let payload = ProviderPayload::normalize(
1873 &provider,
1874 "issues",
1875 &sample_headers(),
1876 serde_json::json!({
1877 "action": "opened",
1878 "installation": {"id": 42},
1879 "issue": {"number": 99}
1880 }),
1881 )
1882 .unwrap();
1883 let event = TriggerEvent {
1884 id: TriggerEventId("trigger_evt_fixed".to_string()),
1885 provider,
1886 kind: "issues".to_string(),
1887 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
1888 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
1889 dedupe_key: "delivery-123".to_string(),
1890 trace_id: TraceId("trace_fixed".to_string()),
1891 tenant_id: Some(TenantId("tenant_1".to_string())),
1892 headers,
1893 provider_payload: payload,
1894 signature_status: SignatureStatus::Verified,
1895 dedupe_claimed: false,
1896 batch: None,
1897 };
1898
1899 let once = serde_json::to_value(&event).unwrap();
1900 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
1901 let twice = serde_json::to_value(&decoded).unwrap();
1902 assert_eq!(decoded, event);
1903 assert_eq!(once, twice);
1904 }
1905
1906 #[test]
1907 fn unknown_provider_errors() {
1908 let error = ProviderPayload::normalize(
1909 &ProviderId::from("custom-provider"),
1910 "thing.happened",
1911 &BTreeMap::new(),
1912 serde_json::json!({"ok": true}),
1913 )
1914 .unwrap_err();
1915 assert_eq!(
1916 error,
1917 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
1918 );
1919 }
1920}