1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15 HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21 TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33 provider_id: ProviderId,
34 kinds: Vec<TriggerKind>,
35 state: Arc<GitHubConnectorState>,
36 client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41 ctx: RwLock<Option<ConnectorCtx>>,
42 bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47 binding_id: String,
48 path: Option<String>,
49 signing_secret: SecretId,
50 dedupe_enabled: bool,
51 dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55 provider_id: ProviderId,
56 state: Arc<GitHubConnectorState>,
57 http: reqwest::Client,
58 tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62 capacity: usize,
63 state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68 entries: HashMap<u64, InstallationTokenEntry>,
69 order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73 token: SecretBytes,
74 refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79 app_id: u64,
80 installation_id: u64,
81 api_base_url: String,
82 private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87 Inline(String),
88 Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94 Issues(IssuesEvent),
95 PullRequest(PullRequestEvent),
96 IssueComment(IssueCommentEvent),
97 PullRequestReview(PullRequestReviewEvent),
98 Push(PushEvent),
99 WorkflowRun(WorkflowRunEvent),
100 DeploymentStatus(DeploymentStatusEvent),
101 CheckRun(CheckRunEvent),
102 Other { kind: String, raw: JsonValue },
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Deserialize)]
107struct IssuesEvent {
108 action: Option<String>,
109 issue: JsonValue,
110}
111
112#[allow(dead_code)]
113#[derive(Debug, Deserialize)]
114struct PullRequestEvent {
115 action: Option<String>,
116 pull_request: JsonValue,
117}
118
119#[allow(dead_code)]
120#[derive(Debug, Deserialize)]
121struct IssueCommentEvent {
122 action: Option<String>,
123 comment: JsonValue,
124 issue: JsonValue,
125}
126
127#[allow(dead_code)]
128#[derive(Debug, Deserialize)]
129struct PullRequestReviewEvent {
130 action: Option<String>,
131 review: JsonValue,
132 pull_request: JsonValue,
133}
134
135#[allow(dead_code)]
136#[derive(Debug, Deserialize)]
137struct PushEvent {
138 #[serde(default)]
139 commits: Vec<JsonValue>,
140 #[serde(default)]
141 distinct_size: Option<u64>,
142}
143
144#[allow(dead_code)]
145#[derive(Debug, Deserialize)]
146struct WorkflowRunEvent {
147 action: Option<String>,
148 workflow_run: JsonValue,
149}
150
151#[allow(dead_code)]
152#[derive(Debug, Deserialize)]
153struct DeploymentStatusEvent {
154 action: Option<String>,
155 deployment_status: JsonValue,
156 deployment: JsonValue,
157}
158
159#[allow(dead_code)]
160#[derive(Debug, Deserialize)]
161struct CheckRunEvent {
162 action: Option<String>,
163 check_run: JsonValue,
164}
165
166#[derive(Debug, Default, Deserialize)]
167struct GitHubBindingConfig {
168 #[serde(default, rename = "match")]
169 match_config: GitHubMatchConfig,
170 #[serde(default)]
171 secrets: GitHubSecretsConfig,
172}
173
174#[derive(Debug, Default, Deserialize)]
175struct GitHubMatchConfig {
176 path: Option<String>,
177}
178
179#[derive(Debug, Default, Deserialize)]
180struct GitHubSecretsConfig {
181 signing_secret: Option<String>,
182 private_key: Option<String>,
183}
184
185#[derive(Debug, Default, Deserialize)]
186struct GitHubClientConfigArgs {
187 app_id: Option<u64>,
188 installation_id: Option<u64>,
189 api_base_url: Option<String>,
190 private_key_pem: Option<String>,
191 private_key_secret: Option<String>,
192 #[serde(default)]
193 secrets: GitHubSecretsConfig,
194}
195
196#[derive(Debug, Deserialize)]
197struct CommentArgs {
198 #[serde(flatten)]
199 config: GitHubClientConfigArgs,
200 issue_url: String,
201 body: String,
202}
203
204#[derive(Debug, Deserialize)]
205struct AddLabelsArgs {
206 #[serde(flatten)]
207 config: GitHubClientConfigArgs,
208 issue_url: String,
209 labels: Vec<String>,
210}
211
212#[derive(Debug, Deserialize)]
213struct RequestReviewArgs {
214 #[serde(flatten)]
215 config: GitHubClientConfigArgs,
216 pr_url: String,
217 reviewers: Vec<String>,
218}
219
220#[derive(Debug, Deserialize)]
221struct MergePrArgs {
222 #[serde(flatten)]
223 config: GitHubClientConfigArgs,
224 pr_url: String,
225 #[serde(default)]
226 commit_title: Option<String>,
227 #[serde(default)]
228 commit_message: Option<String>,
229 #[serde(default)]
230 merge_method: Option<String>,
231 #[serde(default)]
232 sha: Option<String>,
233 #[serde(default)]
234 admin_override: bool,
235}
236
237#[derive(Debug, Deserialize)]
238struct ListStalePrsArgs {
239 #[serde(flatten)]
240 config: GitHubClientConfigArgs,
241 repo: String,
242 days: i64,
243}
244
245#[derive(Debug, Deserialize)]
246struct GetPrDiffArgs {
247 #[serde(flatten)]
248 config: GitHubClientConfigArgs,
249 pr_url: String,
250}
251
252#[derive(Debug, Deserialize)]
253struct CreateIssueArgs {
254 #[serde(flatten)]
255 config: GitHubClientConfigArgs,
256 repo: String,
257 title: String,
258 #[serde(default)]
259 body: Option<String>,
260 #[serde(default)]
261 labels: Option<Vec<String>>,
262}
263
264#[derive(Debug, Deserialize)]
265struct ApiCallArgs {
266 #[serde(flatten)]
267 config: GitHubClientConfigArgs,
268 path: String,
269 method: String,
270 #[serde(default)]
271 body: Option<JsonValue>,
272 #[serde(default)]
273 accept: Option<String>,
274}
275
276#[derive(Debug, Deserialize)]
277struct InstallationTokenResponse {
278 token: String,
279 #[serde(default)]
280 expires_at: Option<String>,
281}
282
283#[derive(Debug, Serialize)]
284struct GitHubJwtClaims {
285 iat: i64,
286 exp: i64,
287 iss: String,
288}
289
290#[derive(Clone, Debug)]
291struct RepoRef {
292 owner: String,
293 repo: String,
294}
295
296#[derive(Clone, Debug)]
297struct IssueRef {
298 repo: RepoRef,
299 number: u64,
300}
301
302impl GitHubConnector {
303 pub fn new() -> Self {
304 let state = Arc::new(GitHubConnectorState::default());
305 let client = Arc::new(GitHubClient {
306 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
307 state: state.clone(),
308 http: reqwest::Client::builder()
309 .user_agent("harn-github-connector")
310 .build()
311 .unwrap_or_else(|_| reqwest::Client::new()),
312 tokens: GitHubInstallationTokenStore::new(32),
313 });
314 Self {
315 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
316 kinds: vec![TriggerKind::from("webhook")],
317 state,
318 client,
319 }
320 }
321
322 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
323 let bindings = self
324 .state
325 .bindings
326 .read()
327 .expect("github connector bindings poisoned");
328 if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
329 return bindings.get(binding_id).cloned().ok_or_else(|| {
330 ConnectorError::Unsupported(format!(
331 "github connector has no active binding `{binding_id}`"
332 ))
333 });
334 }
335 if bindings.len() == 1 {
336 return bindings
337 .values()
338 .next()
339 .cloned()
340 .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
341 }
342 Err(ConnectorError::Unsupported(
343 "github connector requires raw.metadata.binding_id when multiple bindings are active"
344 .to_string(),
345 ))
346 }
347
348 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
349 self.state
350 .ctx
351 .read()
352 .expect("github connector ctx poisoned")
353 .clone()
354 .ok_or_else(|| {
355 ConnectorError::Activation(
356 "github connector must be initialized before use".to_string(),
357 )
358 })
359 }
360}
361
362impl Default for GitHubConnector {
363 fn default() -> Self {
364 Self::new()
365 }
366}
367
368#[async_trait]
369impl Connector for GitHubConnector {
370 fn provider_id(&self) -> &ProviderId {
371 &self.provider_id
372 }
373
374 fn kinds(&self) -> &[TriggerKind] {
375 &self.kinds
376 }
377
378 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
379 *self
380 .state
381 .ctx
382 .write()
383 .expect("github connector ctx poisoned") = Some(ctx);
384 Ok(())
385 }
386
387 async fn activate(
388 &self,
389 bindings: &[TriggerBinding],
390 ) -> Result<ActivationHandle, ConnectorError> {
391 let mut configured = HashMap::new();
392 let mut paths = BTreeSet::new();
393 for binding in bindings {
394 let activated = ActivatedGitHubBinding::from_binding(binding)?;
395 if let Some(path) = &activated.path {
396 if !paths.insert(path.clone()) {
397 return Err(ConnectorError::Activation(format!(
398 "github connector path `{path}` is configured by multiple bindings"
399 )));
400 }
401 }
402 configured.insert(binding.binding_id.clone(), activated);
403 }
404 *self
405 .state
406 .bindings
407 .write()
408 .expect("github connector bindings poisoned") = configured;
409 Ok(ActivationHandle::new(
410 self.provider_id.clone(),
411 bindings.len(),
412 ))
413 }
414
415 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
416 let ctx = self.ctx()?;
417 let binding = self.binding_for_raw(&raw)?;
418 let provider = self.provider_id.clone();
419 let received_at = raw.received_at;
420 let headers = effective_headers(&raw.headers);
421 let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
422 futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
423 ctx.event_log.as_ref(),
424 &provider,
425 HmacSignatureStyle::github(),
426 &raw.body,
427 &headers,
428 secret.as_str(),
429 None,
430 received_at,
431 ))?;
432
433 let payload = raw.json_body()?;
434 let event_kind = header_value(&headers, "x-github-event")
435 .map(ToString::to_string)
436 .or_else(|| {
437 if raw.kind.trim().is_empty() {
438 None
439 } else {
440 Some(raw.kind.clone())
441 }
442 })
443 .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
444 let _typed = parse_typed_event(&event_kind, &payload)?;
445 let dedupe_key = header_value(&headers, "x-github-delivery")
446 .map(ToString::to_string)
447 .unwrap_or_else(|| fallback_body_digest(&raw.body));
448 if binding.dedupe_enabled {
449 let inserted = ctx
450 .inbox
451 .insert_if_new(&binding.binding_id, &dedupe_key, binding.dedupe_ttl)
452 .await?;
453 if !inserted {
454 return Err(ConnectorError::DuplicateDelivery(format!(
455 "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
456 binding.binding_id
457 )));
458 }
459 }
460
461 let provider_payload =
462 ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
463 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
464 Ok(TriggerEvent {
465 id: TriggerEventId::new(),
466 provider,
467 kind: event_kind,
468 received_at,
469 occurred_at: raw.occurred_at,
470 dedupe_key,
471 trace_id: TraceId::new(),
472 tenant_id: raw.tenant_id.clone(),
473 headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
474 batch: None,
475 raw_body: Some(raw.body.clone()),
476 provider_payload,
477 signature_status: SignatureStatus::Verified,
478 dedupe_claimed: false,
479 })
480 }
481
482 fn payload_schema(&self) -> ProviderPayloadSchema {
483 ProviderPayloadSchema::named("GitHubEventPayload")
484 }
485
486 fn client(&self) -> Arc<dyn ConnectorClient> {
487 self.client.clone()
488 }
489}
490
491#[async_trait]
492impl ConnectorClient for GitHubClient {
493 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
494 match method {
495 "comment" => {
496 let args: CommentArgs = parse_args(args)?;
497 let config = self.resolve_client_config(&args.config)?;
498 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
499 self.request_json(
500 &config,
501 Method::POST,
502 &format!(
503 "/repos/{}/{}/issues/{}/comments",
504 issue.repo.owner, issue.repo.repo, issue.number
505 ),
506 Some(json!({ "body": args.body })),
507 "application/vnd.github+json",
508 )
509 .await
510 }
511 "add_labels" => {
512 let args: AddLabelsArgs = parse_args(args)?;
513 let config = self.resolve_client_config(&args.config)?;
514 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
515 self.request_json(
516 &config,
517 Method::POST,
518 &format!(
519 "/repos/{}/{}/issues/{}/labels",
520 issue.repo.owner, issue.repo.repo, issue.number
521 ),
522 Some(json!({ "labels": args.labels })),
523 "application/vnd.github+json",
524 )
525 .await
526 }
527 "request_review" => {
528 let args: RequestReviewArgs = parse_args(args)?;
529 let config = self.resolve_client_config(&args.config)?;
530 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
531 self.request_json(
532 &config,
533 Method::POST,
534 &format!(
535 "/repos/{}/{}/pulls/{}/requested_reviewers",
536 pr.repo.owner, pr.repo.repo, pr.number
537 ),
538 Some(json!({ "reviewers": args.reviewers })),
539 "application/vnd.github+json",
540 )
541 .await
542 }
543 "merge_pr" => {
544 let args: MergePrArgs = parse_args(args)?;
545 let config = self.resolve_client_config(&args.config)?;
546 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
547 let mut body = JsonMap::new();
548 if let Some(value) = args.commit_title {
549 body.insert("commit_title".to_string(), JsonValue::String(value));
550 }
551 if let Some(value) = args.commit_message {
552 body.insert("commit_message".to_string(), JsonValue::String(value));
553 }
554 if let Some(value) = args.merge_method {
555 body.insert("merge_method".to_string(), JsonValue::String(value));
556 }
557 if let Some(value) = args.sha {
558 body.insert("sha".to_string(), JsonValue::String(value));
559 }
560 let mut response = self
561 .request_json(
562 &config,
563 Method::PUT,
564 &format!(
565 "/repos/{}/{}/pulls/{}/merge",
566 pr.repo.owner, pr.repo.repo, pr.number
567 ),
568 Some(JsonValue::Object(body)),
569 "application/vnd.github+json",
570 )
571 .await?;
572 if args.admin_override {
573 if let Some(map) = response.as_object_mut() {
574 map.insert(
575 "admin_override_requested".to_string(),
576 JsonValue::Bool(true),
577 );
578 }
579 }
580 Ok(response)
581 }
582 "list_stale_prs" => {
583 let args: ListStalePrsArgs = parse_args(args)?;
584 let config = self.resolve_client_config(&args.config)?;
585 let repo = parse_repo_ref(&args.repo)?;
586 let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
587 .date()
588 .to_string();
589 let query = format!(
590 "repo:{}/{} is:pr is:open updated:<{}",
591 repo.owner, repo.repo, stale_before
592 );
593 let url = Url::parse_with_params(
594 &absolute_api_url(&config.api_base_url, "/search/issues")?,
595 &[("q", query)],
596 )
597 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
598 let response = self
599 .request_response(
600 &config,
601 Method::GET,
602 url.to_string(),
603 None,
604 "application/vnd.github+json",
605 )
606 .await?;
607 response
608 .json::<JsonValue>()
609 .await
610 .map_err(|error| ClientError::Transport(error.to_string()))
611 }
612 "get_pr_diff" => {
613 let args: GetPrDiffArgs = parse_args(args)?;
614 let config = self.resolve_client_config(&args.config)?;
615 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
616 let text = self
617 .request_text(
618 &config,
619 Method::GET,
620 &format!(
621 "/repos/{}/{}/pulls/{}",
622 pr.repo.owner, pr.repo.repo, pr.number
623 ),
624 "application/vnd.github.diff",
625 )
626 .await?;
627 Ok(JsonValue::String(text))
628 }
629 "create_issue" => {
630 let args: CreateIssueArgs = parse_args(args)?;
631 let config = self.resolve_client_config(&args.config)?;
632 let repo = parse_repo_ref(&args.repo)?;
633 let mut body = JsonMap::new();
634 body.insert("title".to_string(), JsonValue::String(args.title));
635 if let Some(value) = args.body {
636 body.insert("body".to_string(), JsonValue::String(value));
637 }
638 if let Some(labels) = args.labels {
639 body.insert("labels".to_string(), json!(labels));
640 }
641 self.request_json(
642 &config,
643 Method::POST,
644 &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
645 Some(JsonValue::Object(body)),
646 "application/vnd.github+json",
647 )
648 .await
649 }
650 "api_call" => {
651 let args: ApiCallArgs = parse_args(args)?;
652 let config = self.resolve_client_config(&args.config)?;
653 let method = Method::from_bytes(args.method.as_bytes())
654 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
655 self.request_json(
656 &config,
657 method,
658 &args.path,
659 args.body,
660 args.accept
661 .as_deref()
662 .unwrap_or("application/vnd.github+json"),
663 )
664 .await
665 }
666 other => Err(ClientError::MethodNotFound(format!(
667 "github connector does not implement outbound method `{other}`"
668 ))),
669 }
670 }
671}
672
673impl GitHubClient {
674 fn resolve_client_config(
675 &self,
676 args: &GitHubClientConfigArgs,
677 ) -> Result<ResolvedGitHubClientConfig, ClientError> {
678 let app_id = args.app_id.ok_or_else(|| {
679 ClientError::InvalidArgs("github connector requires app_id".to_string())
680 })?;
681 let installation_id = args.installation_id.ok_or_else(|| {
682 ClientError::InvalidArgs("github connector requires installation_id".to_string())
683 })?;
684 let private_key = if let Some(secret_id) = args
685 .private_key_secret
686 .as_deref()
687 .or(args.secrets.private_key.as_deref())
688 .and_then(|value| parse_secret_id(Some(value)))
689 {
690 PrivateKeySource::Secret(secret_id)
691 } else if let Some(pem) = args.private_key_pem.clone() {
692 PrivateKeySource::Inline(pem)
693 } else {
694 return Err(ClientError::InvalidArgs(
695 "github connector requires private_key_secret or private_key_pem".to_string(),
696 ));
697 };
698 Ok(ResolvedGitHubClientConfig {
699 app_id,
700 installation_id,
701 api_base_url: args
702 .api_base_url
703 .clone()
704 .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
705 private_key,
706 })
707 }
708
709 fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
710 self.state
711 .ctx
712 .read()
713 .expect("github connector ctx poisoned")
714 .clone()
715 .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
716 }
717
718 async fn request_json(
719 &self,
720 config: &ResolvedGitHubClientConfig,
721 method: Method,
722 path: &str,
723 body: Option<JsonValue>,
724 accept: &str,
725 ) -> Result<JsonValue, ClientError> {
726 let url = absolute_api_url(&config.api_base_url, path)?;
727 let response = self
728 .request_response(config, method, url, body, accept)
729 .await?;
730 response
731 .json::<JsonValue>()
732 .await
733 .map_err(|error| ClientError::Transport(error.to_string()))
734 }
735
736 async fn request_text(
737 &self,
738 config: &ResolvedGitHubClientConfig,
739 method: Method,
740 path: &str,
741 accept: &str,
742 ) -> Result<String, ClientError> {
743 let url = absolute_api_url(&config.api_base_url, path)?;
744 let response = self
745 .request_response(config, method, url, None, accept)
746 .await?;
747 response
748 .text()
749 .await
750 .map_err(|error| ClientError::Transport(error.to_string()))
751 }
752
753 async fn request_response(
754 &self,
755 config: &ResolvedGitHubClientConfig,
756 method: Method,
757 url: String,
758 body: Option<JsonValue>,
759 accept: &str,
760 ) -> Result<Response, ClientError> {
761 let mut retried_401 = false;
762 let mut retried_rate_limit = false;
763 loop {
764 let token = self.installation_token(config).await?;
765 let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
766 let ctx = self.ctx()?;
767 ctx.rate_limiter
768 .scoped(
769 &self.provider_id,
770 format!("installation:{}", config.installation_id),
771 )
772 .acquire()
773 .await;
774
775 let mut request = self
776 .http
777 .request(method.clone(), &url)
778 .header("Accept", accept)
779 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
780 .header("Authorization", format!("Bearer {token_text}"));
781 if let Some(payload) = body.clone() {
782 request = request.json(&payload);
783 }
784 let response = request
785 .send()
786 .await
787 .map_err(|error| ClientError::Transport(error.to_string()))?;
788 self.record_rate_limit_observation(&response).await;
789
790 if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
791 self.tokens.invalidate(config.installation_id);
792 retried_401 = true;
793 continue;
794 }
795 if is_rate_limited(&response) && !retried_rate_limit {
796 tokio::time::sleep(rate_limit_backoff(&response)).await;
797 retried_rate_limit = true;
798 continue;
799 }
800 if !response.status().is_success() {
801 let status = response.status();
802 let body = response.text().await.unwrap_or_default();
803 let message = if body.trim().is_empty() {
804 format!("github API request failed with status {status}")
805 } else {
806 format!("github API request failed with status {status}: {body}")
807 };
808 return Err(if is_rate_limited_status(status) {
809 ClientError::RateLimited(message)
810 } else {
811 ClientError::Transport(message)
812 });
813 }
814 return Ok(response);
815 }
816 }
817
818 async fn installation_token(
819 &self,
820 config: &ResolvedGitHubClientConfig,
821 ) -> Result<SecretBytes, ClientError> {
822 let now = OffsetDateTime::now_utc();
823 if let Some(token) = self.tokens.get(config.installation_id, now) {
824 return Ok(token);
825 }
826 let jwt = self.mint_app_jwt(config).await?;
827 let url = absolute_api_url(
828 &config.api_base_url,
829 &format!(
830 "/app/installations/{}/access_tokens",
831 config.installation_id
832 ),
833 )?;
834 let ctx = self.ctx()?;
835 ctx.rate_limiter
836 .scoped(
837 &self.provider_id,
838 format!("installation:{}", config.installation_id),
839 )
840 .acquire()
841 .await;
842 let response = self
843 .http
844 .post(url)
845 .header("Accept", "application/vnd.github+json")
846 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
847 .header("Authorization", format!("Bearer {jwt}"))
848 .send()
849 .await
850 .map_err(|error| ClientError::Transport(error.to_string()))?;
851 self.record_rate_limit_observation(&response).await;
852 if !response.status().is_success() {
853 let status = response.status();
854 let body = response.text().await.unwrap_or_default();
855 return Err(ClientError::Transport(format!(
856 "failed to create GitHub installation token ({status}): {body}"
857 )));
858 }
859 let payload: InstallationTokenResponse = response
860 .json()
861 .await
862 .map_err(|error| ClientError::Transport(error.to_string()))?;
863 let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
864 let token = SecretBytes::from(payload.token);
865 let result = token.reborrow();
866 self.tokens.store(config.installation_id, token, refresh_at);
867 Ok(result)
868 }
869
870 async fn mint_app_jwt(
871 &self,
872 config: &ResolvedGitHubClientConfig,
873 ) -> Result<String, ClientError> {
874 let pem = self.private_key_pem(config).await?;
875 let now = OffsetDateTime::now_utc();
876 let claims = GitHubJwtClaims {
877 iat: (now - Duration::seconds(60)).unix_timestamp(),
878 exp: (now + Duration::minutes(9)).unix_timestamp(),
879 iss: config.app_id.to_string(),
880 };
881 let header = Header::new(Algorithm::RS256);
882 let key = EncodingKey::from_rsa_pem(pem.as_bytes())
883 .map_err(|error| ClientError::Other(error.to_string()))?;
884 jsonwebtoken::encode(&header, &claims, &key)
885 .map_err(|error| ClientError::Other(error.to_string()))
886 }
887
888 async fn private_key_pem(
889 &self,
890 config: &ResolvedGitHubClientConfig,
891 ) -> Result<String, ClientError> {
892 match &config.private_key {
893 PrivateKeySource::Inline(pem) => Ok(pem.clone()),
894 PrivateKeySource::Secret(id) => {
895 let ctx = self.ctx()?;
896 let secret = ctx
897 .secrets
898 .get(id)
899 .await
900 .map_err(|error| ClientError::Other(error.to_string()))?;
901 secret.with_exposed(|bytes| {
902 std::str::from_utf8(bytes)
903 .map(|value| value.to_string())
904 .map_err(|error| ClientError::Other(error.to_string()))
905 })
906 }
907 }
908 }
909
910 async fn record_rate_limit_observation(&self, response: &Response) {
911 let remaining = response
912 .headers()
913 .get("x-ratelimit-remaining")
914 .and_then(|value| value.to_str().ok())
915 .and_then(|value| value.parse::<u64>().ok());
916 let reset = response
917 .headers()
918 .get("x-ratelimit-reset")
919 .and_then(|value| value.to_str().ok())
920 .and_then(|value| value.parse::<i64>().ok());
921 let Some(remaining) = remaining else {
922 return;
923 };
924 if remaining > 10 {
925 return;
926 }
927 let Ok(ctx) = self.ctx() else {
928 return;
929 };
930 let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
931 return;
932 };
933 let _ = ctx
934 .event_log
935 .append(
936 &topic,
937 LogEvent::new(
938 "github.rate_limit",
939 json!({
940 "remaining": remaining,
941 "reset": reset,
942 "status": response.status().as_u16(),
943 }),
944 ),
945 )
946 .await;
947 }
948}
949
950impl ActivatedGitHubBinding {
951 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
952 let config: GitHubBindingConfig =
953 serde_json::from_value(binding.config.clone()).map_err(|error| {
954 ConnectorError::Activation(format!(
955 "github binding `{}` has invalid config: {error}",
956 binding.binding_id
957 ))
958 })?;
959 let signing_secret =
960 parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
961 ConnectorError::Activation(format!(
962 "github binding `{}` requires secrets.signing_secret",
963 binding.binding_id
964 ))
965 })?;
966 Ok(Self {
967 binding_id: binding.binding_id.clone(),
968 path: config.match_config.path,
969 signing_secret,
970 dedupe_enabled: binding.dedupe_key.is_some(),
971 dedupe_ttl: std::time::Duration::from_secs(
972 u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
973 ),
974 })
975 }
976}
977
978impl GitHubInstallationTokenStore {
979 fn new(capacity: usize) -> Self {
980 Self {
981 capacity: capacity.max(1),
982 state: Mutex::new(TokenCacheState::default()),
983 }
984 }
985
986 fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
987 let mut state = self.state.lock().expect("github token cache poisoned");
988 let refresh_at = state
989 .entries
990 .get(&installation_id)
991 .map(|entry| entry.refresh_at)?;
992 if refresh_at <= now {
993 state.entries.remove(&installation_id);
994 state.order.retain(|id| *id != installation_id);
995 return None;
996 }
997 touch_lru(&mut state.order, installation_id);
998 state
999 .entries
1000 .get(&installation_id)
1001 .map(|entry| entry.token.reborrow())
1002 }
1003
1004 fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
1005 let mut state = self.state.lock().expect("github token cache poisoned");
1006 state.entries.insert(
1007 installation_id,
1008 InstallationTokenEntry { token, refresh_at },
1009 );
1010 touch_lru(&mut state.order, installation_id);
1011 while state.entries.len() > self.capacity {
1012 if let Some(evicted) = state.order.pop_front() {
1013 state.entries.remove(&evicted);
1014 }
1015 }
1016 }
1017
1018 fn invalidate(&self, installation_id: u64) {
1019 let mut state = self.state.lock().expect("github token cache poisoned");
1020 state.entries.remove(&installation_id);
1021 state.order.retain(|id| *id != installation_id);
1022 }
1023}
1024
1025fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
1026 order.retain(|id| *id != installation_id);
1027 order.push_back(installation_id);
1028}
1029
1030fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
1031 serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
1032}
1033
1034fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
1035 match kind {
1036 "issues" => serde_json::from_value(payload.clone())
1037 .map(ParsedGitHubEvent::Issues)
1038 .map_err(|error| ConnectorError::Json(error.to_string())),
1039 "pull_request" => serde_json::from_value(payload.clone())
1040 .map(ParsedGitHubEvent::PullRequest)
1041 .map_err(|error| ConnectorError::Json(error.to_string())),
1042 "issue_comment" => serde_json::from_value(payload.clone())
1043 .map(ParsedGitHubEvent::IssueComment)
1044 .map_err(|error| ConnectorError::Json(error.to_string())),
1045 "pull_request_review" => serde_json::from_value(payload.clone())
1046 .map(ParsedGitHubEvent::PullRequestReview)
1047 .map_err(|error| ConnectorError::Json(error.to_string())),
1048 "push" => serde_json::from_value(payload.clone())
1049 .map(ParsedGitHubEvent::Push)
1050 .map_err(|error| ConnectorError::Json(error.to_string())),
1051 "workflow_run" => serde_json::from_value(payload.clone())
1052 .map(ParsedGitHubEvent::WorkflowRun)
1053 .map_err(|error| ConnectorError::Json(error.to_string())),
1054 "deployment_status" => serde_json::from_value(payload.clone())
1055 .map(ParsedGitHubEvent::DeploymentStatus)
1056 .map_err(|error| ConnectorError::Json(error.to_string())),
1057 "check_run" => serde_json::from_value(payload.clone())
1058 .map(ParsedGitHubEvent::CheckRun)
1059 .map_err(|error| ConnectorError::Json(error.to_string())),
1060 other => Ok(ParsedGitHubEvent::Other {
1061 kind: other.to_string(),
1062 raw: payload.clone(),
1063 }),
1064 }
1065}
1066
1067fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1068 let mut effective = headers.clone();
1069 canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1070 canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1071 canonicalize_header(
1072 headers,
1073 &mut effective,
1074 "x-github-delivery",
1075 "X-GitHub-Delivery",
1076 );
1077 canonicalize_header(
1078 headers,
1079 &mut effective,
1080 "x-github-hook-id",
1081 "X-GitHub-Hook-Id",
1082 );
1083 canonicalize_header(
1084 headers,
1085 &mut effective,
1086 "x-hub-signature-256",
1087 "X-Hub-Signature-256",
1088 );
1089 effective
1090}
1091
1092fn canonicalize_header(
1093 source: &BTreeMap<String, String>,
1094 target: &mut BTreeMap<String, String>,
1095 lookup_name: &str,
1096 canonical_name: &str,
1097) {
1098 if let Some(value) = header_value(source, lookup_name) {
1099 target
1100 .entry(canonical_name.to_string())
1101 .or_insert_with(|| value.to_string());
1102 }
1103}
1104
1105fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1106 headers
1107 .iter()
1108 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1109 .map(|(_, value)| value.as_str())
1110}
1111
1112fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1113 let trimmed = raw?.trim();
1114 if trimmed.is_empty() {
1115 return None;
1116 }
1117 let (base, version) = match trimmed.rsplit_once('@') {
1118 Some((base, version_text)) => {
1119 let version = version_text.parse::<u64>().ok()?;
1120 (base, SecretVersion::Exact(version))
1121 }
1122 None => (trimmed, SecretVersion::Latest),
1123 };
1124 let (namespace, name) = base.split_once('/')?;
1125 if namespace.is_empty() || name.is_empty() {
1126 return None;
1127 }
1128 Some(SecretId::new(namespace, name).with_version(version))
1129}
1130
1131fn load_secret_text_blocking(
1132 ctx: &ConnectorCtx,
1133 secret_id: &SecretId,
1134) -> Result<String, ConnectorError> {
1135 let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1136 secret.with_exposed(|bytes| {
1137 std::str::from_utf8(bytes)
1138 .map(|value| value.to_string())
1139 .map_err(|error| {
1140 ConnectorError::Secret(format!(
1141 "github secret `{secret_id}` is not valid UTF-8: {error}"
1142 ))
1143 })
1144 })
1145}
1146
1147fn fallback_body_digest(body: &[u8]) -> String {
1148 let digest = Sha256::digest(body);
1149 format!("sha256:{}", hex::encode(digest))
1150}
1151
1152fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1153 let trimmed = input.trim().trim_matches('/');
1154 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1155 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1156 if parts.len() >= 2 {
1157 return Ok(RepoRef {
1158 owner: parts[0].to_string(),
1159 repo: parts[1].to_string(),
1160 });
1161 }
1162 }
1163 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1164 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1165 if parts.len() >= 2 {
1166 return Ok(RepoRef {
1167 owner: parts[0].to_string(),
1168 repo: parts[1].to_string(),
1169 });
1170 }
1171 }
1172 if let Some((owner, repo)) = trimmed.split_once('/') {
1173 if !owner.is_empty() && !repo.is_empty() {
1174 return Ok(RepoRef {
1175 owner: owner.to_string(),
1176 repo: repo.to_string(),
1177 });
1178 }
1179 }
1180 Err(ClientError::InvalidArgs(format!(
1181 "invalid GitHub repository reference `{input}`"
1182 )))
1183}
1184
1185fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1186 let trimmed = input.trim().trim_matches('/');
1187 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1188 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1189 if parts.len() >= 4 {
1190 let kind = parts[2];
1191 if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1192 return Ok(IssueRef {
1193 repo: RepoRef {
1194 owner: parts[0].to_string(),
1195 repo: parts[1].to_string(),
1196 },
1197 number: parts[3].parse().map_err(|error| {
1198 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1199 })?,
1200 });
1201 }
1202 }
1203 }
1204 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1205 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1206 if parts.len() >= 4 {
1207 let kind = parts[2];
1208 if kind == expected_kind
1209 || (expected_kind == "issue" && kind == "issues")
1210 || (expected_kind == "pull" && kind == "pulls")
1211 {
1212 return Ok(IssueRef {
1213 repo: RepoRef {
1214 owner: parts[0].to_string(),
1215 repo: parts[1].to_string(),
1216 },
1217 number: parts[3].parse().map_err(|error| {
1218 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1219 })?,
1220 });
1221 }
1222 }
1223 }
1224 Err(ClientError::InvalidArgs(format!(
1225 "invalid GitHub {expected_kind} URL `{input}`"
1226 )))
1227}
1228
1229fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1230 let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1231 base.join(path.trim_start_matches('/'))
1232 .map(|value| value.to_string())
1233 .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1234}
1235
1236fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1237 let eager_refresh = now + Duration::minutes(55);
1238 let Some(expires_at) = expires_at else {
1239 return eager_refresh;
1240 };
1241 let parsed =
1242 OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1243 parsed
1244 .map(|value| value - Duration::minutes(5))
1245 .map(|value| {
1246 if value < eager_refresh {
1247 value
1248 } else {
1249 eager_refresh
1250 }
1251 })
1252 .unwrap_or(eager_refresh)
1253}
1254
1255fn is_rate_limited(response: &Response) -> bool {
1256 if is_rate_limited_status(response.status()) {
1257 return true;
1258 }
1259 response
1260 .headers()
1261 .get("x-ratelimit-remaining")
1262 .and_then(|value| value.to_str().ok())
1263 .map(|value| value == "0")
1264 .unwrap_or(false)
1265}
1266
1267fn is_rate_limited_status(status: StatusCode) -> bool {
1268 status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1269}
1270
1271fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1272 if let Some(delay) = response
1273 .headers()
1274 .get("retry-after")
1275 .and_then(|value| value.to_str().ok())
1276 .and_then(|value| value.parse::<u64>().ok())
1277 {
1278 return std::time::Duration::from_secs(delay);
1279 }
1280 if let Some(reset_at) = response
1281 .headers()
1282 .get("x-ratelimit-reset")
1283 .and_then(|value| value.to_str().ok())
1284 .and_then(|value| value.parse::<i64>().ok())
1285 .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1286 {
1287 let now = OffsetDateTime::now_utc();
1288 if reset_at > now {
1289 let delta = reset_at - now;
1290 return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1291 }
1292 }
1293 std::time::Duration::from_millis(100)
1294}