1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15 HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21 TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33 provider_id: ProviderId,
34 kinds: Vec<TriggerKind>,
35 state: Arc<GitHubConnectorState>,
36 client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41 ctx: RwLock<Option<ConnectorCtx>>,
42 bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47 binding_id: String,
48 path: Option<String>,
49 signing_secret: SecretId,
50 dedupe_enabled: bool,
51 dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55 provider_id: ProviderId,
56 state: Arc<GitHubConnectorState>,
57 http: reqwest::Client,
58 tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62 capacity: usize,
63 state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68 entries: HashMap<u64, InstallationTokenEntry>,
69 order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73 token: SecretBytes,
74 refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79 app_id: u64,
80 installation_id: u64,
81 api_base_url: String,
82 private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87 Inline(String),
88 Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94 Issues(IssuesEvent),
95 PullRequest(PullRequestEvent),
96 IssueComment(IssueCommentEvent),
97 PullRequestReview(PullRequestReviewEvent),
98 Push(PushEvent),
99 WorkflowRun(WorkflowRunEvent),
100 DeploymentStatus(DeploymentStatusEvent),
101 CheckRun(CheckRunEvent),
102 Other { kind: String, raw: JsonValue },
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Deserialize)]
107struct IssuesEvent {
108 action: Option<String>,
109 issue: JsonValue,
110}
111
112#[allow(dead_code)]
113#[derive(Debug, Deserialize)]
114struct PullRequestEvent {
115 action: Option<String>,
116 pull_request: JsonValue,
117}
118
119#[allow(dead_code)]
120#[derive(Debug, Deserialize)]
121struct IssueCommentEvent {
122 action: Option<String>,
123 comment: JsonValue,
124 issue: JsonValue,
125}
126
127#[allow(dead_code)]
128#[derive(Debug, Deserialize)]
129struct PullRequestReviewEvent {
130 action: Option<String>,
131 review: JsonValue,
132 pull_request: JsonValue,
133}
134
135#[allow(dead_code)]
136#[derive(Debug, Deserialize)]
137struct PushEvent {
138 #[serde(default)]
139 commits: Vec<JsonValue>,
140 #[serde(default)]
141 distinct_size: Option<u64>,
142}
143
144#[allow(dead_code)]
145#[derive(Debug, Deserialize)]
146struct WorkflowRunEvent {
147 action: Option<String>,
148 workflow_run: JsonValue,
149}
150
151#[allow(dead_code)]
152#[derive(Debug, Deserialize)]
153struct DeploymentStatusEvent {
154 action: Option<String>,
155 deployment_status: JsonValue,
156 deployment: JsonValue,
157}
158
159#[allow(dead_code)]
160#[derive(Debug, Deserialize)]
161struct CheckRunEvent {
162 action: Option<String>,
163 check_run: JsonValue,
164}
165
166#[derive(Debug, Default, Deserialize)]
167struct GitHubBindingConfig {
168 #[serde(default, rename = "match")]
169 match_config: GitHubMatchConfig,
170 #[serde(default)]
171 secrets: GitHubSecretsConfig,
172}
173
174#[derive(Debug, Default, Deserialize)]
175struct GitHubMatchConfig {
176 path: Option<String>,
177}
178
179#[derive(Debug, Default, Deserialize)]
180struct GitHubSecretsConfig {
181 signing_secret: Option<String>,
182 private_key: Option<String>,
183}
184
185#[derive(Debug, Default, Deserialize)]
186struct GitHubClientConfigArgs {
187 app_id: Option<u64>,
188 installation_id: Option<u64>,
189 api_base_url: Option<String>,
190 private_key_pem: Option<String>,
191 private_key_secret: Option<String>,
192 #[serde(default)]
193 secrets: GitHubSecretsConfig,
194}
195
196#[derive(Debug, Deserialize)]
197struct CommentArgs {
198 #[serde(flatten)]
199 config: GitHubClientConfigArgs,
200 issue_url: String,
201 body: String,
202}
203
204#[derive(Debug, Deserialize)]
205struct AddLabelsArgs {
206 #[serde(flatten)]
207 config: GitHubClientConfigArgs,
208 issue_url: String,
209 labels: Vec<String>,
210}
211
212#[derive(Debug, Deserialize)]
213struct RequestReviewArgs {
214 #[serde(flatten)]
215 config: GitHubClientConfigArgs,
216 pr_url: String,
217 reviewers: Vec<String>,
218}
219
220#[derive(Debug, Deserialize)]
221struct MergePrArgs {
222 #[serde(flatten)]
223 config: GitHubClientConfigArgs,
224 pr_url: String,
225 #[serde(default)]
226 commit_title: Option<String>,
227 #[serde(default)]
228 commit_message: Option<String>,
229 #[serde(default)]
230 merge_method: Option<String>,
231 #[serde(default)]
232 sha: Option<String>,
233 #[serde(default)]
234 admin_override: bool,
235}
236
237#[derive(Debug, Deserialize)]
238struct ListStalePrsArgs {
239 #[serde(flatten)]
240 config: GitHubClientConfigArgs,
241 repo: String,
242 days: i64,
243}
244
245#[derive(Debug, Deserialize)]
246struct GetPrDiffArgs {
247 #[serde(flatten)]
248 config: GitHubClientConfigArgs,
249 pr_url: String,
250}
251
252#[derive(Debug, Deserialize)]
253struct CreateIssueArgs {
254 #[serde(flatten)]
255 config: GitHubClientConfigArgs,
256 repo: String,
257 title: String,
258 #[serde(default)]
259 body: Option<String>,
260 #[serde(default)]
261 labels: Option<Vec<String>>,
262}
263
264#[derive(Debug, Deserialize)]
265struct ApiCallArgs {
266 #[serde(flatten)]
267 config: GitHubClientConfigArgs,
268 path: String,
269 method: String,
270 #[serde(default)]
271 body: Option<JsonValue>,
272 #[serde(default)]
273 accept: Option<String>,
274}
275
276#[derive(Debug, Deserialize)]
277struct InstallationTokenResponse {
278 token: String,
279 #[serde(default)]
280 expires_at: Option<String>,
281}
282
283#[derive(Debug, Serialize)]
284struct GitHubJwtClaims {
285 iat: i64,
286 exp: i64,
287 iss: String,
288}
289
290#[derive(Clone, Debug)]
291struct RepoRef {
292 owner: String,
293 repo: String,
294}
295
296#[derive(Clone, Debug)]
297struct IssueRef {
298 repo: RepoRef,
299 number: u64,
300}
301
302impl GitHubConnector {
303 pub fn new() -> Self {
304 let state = Arc::new(GitHubConnectorState::default());
305 let client = Arc::new(GitHubClient {
306 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
307 state: state.clone(),
308 http: crate::connectors::outbound_http_client("harn-github-connector"),
309 tokens: GitHubInstallationTokenStore::new(32),
310 });
311 Self {
312 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
313 kinds: vec![TriggerKind::from("webhook")],
314 state,
315 client,
316 }
317 }
318
319 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
320 let bindings = self
321 .state
322 .bindings
323 .read()
324 .expect("github connector bindings poisoned");
325 if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
326 return bindings.get(binding_id).cloned().ok_or_else(|| {
327 ConnectorError::Unsupported(format!(
328 "github connector has no active binding `{binding_id}`"
329 ))
330 });
331 }
332 if bindings.len() == 1 {
333 return bindings
334 .values()
335 .next()
336 .cloned()
337 .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
338 }
339 Err(ConnectorError::Unsupported(
340 "github connector requires raw.metadata.binding_id when multiple bindings are active"
341 .to_string(),
342 ))
343 }
344
345 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
346 self.state
347 .ctx
348 .read()
349 .expect("github connector ctx poisoned")
350 .clone()
351 .ok_or_else(|| {
352 ConnectorError::Activation(
353 "github connector must be initialized before use".to_string(),
354 )
355 })
356 }
357}
358
359impl Default for GitHubConnector {
360 fn default() -> Self {
361 Self::new()
362 }
363}
364
365#[async_trait]
366impl Connector for GitHubConnector {
367 fn provider_id(&self) -> &ProviderId {
368 &self.provider_id
369 }
370
371 fn kinds(&self) -> &[TriggerKind] {
372 &self.kinds
373 }
374
375 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
376 *self
377 .state
378 .ctx
379 .write()
380 .expect("github connector ctx poisoned") = Some(ctx);
381 Ok(())
382 }
383
384 async fn activate(
385 &self,
386 bindings: &[TriggerBinding],
387 ) -> Result<ActivationHandle, ConnectorError> {
388 let mut configured = HashMap::new();
389 let mut paths = BTreeSet::new();
390 for binding in bindings {
391 let activated = ActivatedGitHubBinding::from_binding(binding)?;
392 if let Some(path) = &activated.path {
393 if !paths.insert(path.clone()) {
394 return Err(ConnectorError::Activation(format!(
395 "github connector path `{path}` is configured by multiple bindings"
396 )));
397 }
398 }
399 configured.insert(binding.binding_id.clone(), activated);
400 }
401 *self
402 .state
403 .bindings
404 .write()
405 .expect("github connector bindings poisoned") = configured;
406 Ok(ActivationHandle::new(
407 self.provider_id.clone(),
408 bindings.len(),
409 ))
410 }
411
412 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
413 let ctx = self.ctx()?;
414 let binding = self.binding_for_raw(&raw)?;
415 let provider = self.provider_id.clone();
416 let received_at = raw.received_at;
417 let headers = effective_headers(&raw.headers);
418 let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
419 futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
420 ctx.event_log.as_ref(),
421 &provider,
422 HmacSignatureStyle::github(),
423 &raw.body,
424 &headers,
425 secret.as_str(),
426 None,
427 received_at,
428 ))?;
429
430 let payload = raw.json_body()?;
431 let event_kind = header_value(&headers, "x-github-event")
432 .map(ToString::to_string)
433 .or_else(|| {
434 if raw.kind.trim().is_empty() {
435 None
436 } else {
437 Some(raw.kind.clone())
438 }
439 })
440 .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
441 let _typed = parse_typed_event(&event_kind, &payload)?;
442 let dedupe_key = header_value(&headers, "x-github-delivery")
443 .map(ToString::to_string)
444 .unwrap_or_else(|| fallback_body_digest(&raw.body));
445 if binding.dedupe_enabled {
446 let inserted = ctx
447 .inbox
448 .insert_if_new(&binding.binding_id, &dedupe_key, binding.dedupe_ttl)
449 .await?;
450 if !inserted {
451 return Err(ConnectorError::DuplicateDelivery(format!(
452 "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
453 binding.binding_id
454 )));
455 }
456 }
457
458 let provider_payload =
459 ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
460 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
461 Ok(TriggerEvent {
462 id: TriggerEventId::new(),
463 provider,
464 kind: event_kind,
465 received_at,
466 occurred_at: raw.occurred_at,
467 dedupe_key,
468 trace_id: TraceId::new(),
469 tenant_id: raw.tenant_id.clone(),
470 headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
471 batch: None,
472 raw_body: Some(raw.body.clone()),
473 provider_payload,
474 signature_status: SignatureStatus::Verified,
475 dedupe_claimed: false,
476 })
477 }
478
479 fn payload_schema(&self) -> ProviderPayloadSchema {
480 ProviderPayloadSchema::named("GitHubEventPayload")
481 }
482
483 fn client(&self) -> Arc<dyn ConnectorClient> {
484 self.client.clone()
485 }
486}
487
488#[async_trait]
489impl ConnectorClient for GitHubClient {
490 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
491 match method {
492 "comment" => {
493 let args: CommentArgs = parse_args(args)?;
494 let config = self.resolve_client_config(&args.config)?;
495 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
496 self.request_json(
497 &config,
498 Method::POST,
499 &format!(
500 "/repos/{}/{}/issues/{}/comments",
501 issue.repo.owner, issue.repo.repo, issue.number
502 ),
503 Some(json!({ "body": args.body })),
504 "application/vnd.github+json",
505 )
506 .await
507 }
508 "add_labels" => {
509 let args: AddLabelsArgs = parse_args(args)?;
510 let config = self.resolve_client_config(&args.config)?;
511 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
512 self.request_json(
513 &config,
514 Method::POST,
515 &format!(
516 "/repos/{}/{}/issues/{}/labels",
517 issue.repo.owner, issue.repo.repo, issue.number
518 ),
519 Some(json!({ "labels": args.labels })),
520 "application/vnd.github+json",
521 )
522 .await
523 }
524 "request_review" => {
525 let args: RequestReviewArgs = parse_args(args)?;
526 let config = self.resolve_client_config(&args.config)?;
527 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
528 self.request_json(
529 &config,
530 Method::POST,
531 &format!(
532 "/repos/{}/{}/pulls/{}/requested_reviewers",
533 pr.repo.owner, pr.repo.repo, pr.number
534 ),
535 Some(json!({ "reviewers": args.reviewers })),
536 "application/vnd.github+json",
537 )
538 .await
539 }
540 "merge_pr" => {
541 let args: MergePrArgs = parse_args(args)?;
542 let config = self.resolve_client_config(&args.config)?;
543 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
544 let mut body = JsonMap::new();
545 if let Some(value) = args.commit_title {
546 body.insert("commit_title".to_string(), JsonValue::String(value));
547 }
548 if let Some(value) = args.commit_message {
549 body.insert("commit_message".to_string(), JsonValue::String(value));
550 }
551 if let Some(value) = args.merge_method {
552 body.insert("merge_method".to_string(), JsonValue::String(value));
553 }
554 if let Some(value) = args.sha {
555 body.insert("sha".to_string(), JsonValue::String(value));
556 }
557 let mut response = self
558 .request_json(
559 &config,
560 Method::PUT,
561 &format!(
562 "/repos/{}/{}/pulls/{}/merge",
563 pr.repo.owner, pr.repo.repo, pr.number
564 ),
565 Some(JsonValue::Object(body)),
566 "application/vnd.github+json",
567 )
568 .await?;
569 if args.admin_override {
570 if let Some(map) = response.as_object_mut() {
571 map.insert(
572 "admin_override_requested".to_string(),
573 JsonValue::Bool(true),
574 );
575 }
576 }
577 Ok(response)
578 }
579 "list_stale_prs" => {
580 let args: ListStalePrsArgs = parse_args(args)?;
581 let config = self.resolve_client_config(&args.config)?;
582 let repo = parse_repo_ref(&args.repo)?;
583 let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
584 .date()
585 .to_string();
586 let query = format!(
587 "repo:{}/{} is:pr is:open updated:<{}",
588 repo.owner, repo.repo, stale_before
589 );
590 let url = Url::parse_with_params(
591 &absolute_api_url(&config.api_base_url, "/search/issues")?,
592 &[("q", query)],
593 )
594 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
595 let response = self
596 .request_response(
597 &config,
598 Method::GET,
599 url.to_string(),
600 None,
601 "application/vnd.github+json",
602 )
603 .await?;
604 response
605 .json::<JsonValue>()
606 .await
607 .map_err(|error| ClientError::Transport(error.to_string()))
608 }
609 "get_pr_diff" => {
610 let args: GetPrDiffArgs = parse_args(args)?;
611 let config = self.resolve_client_config(&args.config)?;
612 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
613 let text = self
614 .request_text(
615 &config,
616 Method::GET,
617 &format!(
618 "/repos/{}/{}/pulls/{}",
619 pr.repo.owner, pr.repo.repo, pr.number
620 ),
621 "application/vnd.github.diff",
622 )
623 .await?;
624 Ok(JsonValue::String(text))
625 }
626 "create_issue" => {
627 let args: CreateIssueArgs = parse_args(args)?;
628 let config = self.resolve_client_config(&args.config)?;
629 let repo = parse_repo_ref(&args.repo)?;
630 let mut body = JsonMap::new();
631 body.insert("title".to_string(), JsonValue::String(args.title));
632 if let Some(value) = args.body {
633 body.insert("body".to_string(), JsonValue::String(value));
634 }
635 if let Some(labels) = args.labels {
636 body.insert("labels".to_string(), json!(labels));
637 }
638 self.request_json(
639 &config,
640 Method::POST,
641 &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
642 Some(JsonValue::Object(body)),
643 "application/vnd.github+json",
644 )
645 .await
646 }
647 "api_call" => {
648 let args: ApiCallArgs = parse_args(args)?;
649 let config = self.resolve_client_config(&args.config)?;
650 let method = Method::from_bytes(args.method.as_bytes())
651 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
652 self.request_json(
653 &config,
654 method,
655 &args.path,
656 args.body,
657 args.accept
658 .as_deref()
659 .unwrap_or("application/vnd.github+json"),
660 )
661 .await
662 }
663 other => Err(ClientError::MethodNotFound(format!(
664 "github connector does not implement outbound method `{other}`"
665 ))),
666 }
667 }
668}
669
670impl GitHubClient {
671 fn resolve_client_config(
672 &self,
673 args: &GitHubClientConfigArgs,
674 ) -> Result<ResolvedGitHubClientConfig, ClientError> {
675 let app_id = args.app_id.ok_or_else(|| {
676 ClientError::InvalidArgs("github connector requires app_id".to_string())
677 })?;
678 let installation_id = args.installation_id.ok_or_else(|| {
679 ClientError::InvalidArgs("github connector requires installation_id".to_string())
680 })?;
681 let private_key = if let Some(secret_id) = args
682 .private_key_secret
683 .as_deref()
684 .or(args.secrets.private_key.as_deref())
685 .and_then(|value| parse_secret_id(Some(value)))
686 {
687 PrivateKeySource::Secret(secret_id)
688 } else if let Some(pem) = args.private_key_pem.clone() {
689 PrivateKeySource::Inline(pem)
690 } else {
691 return Err(ClientError::InvalidArgs(
692 "github connector requires private_key_secret or private_key_pem".to_string(),
693 ));
694 };
695 Ok(ResolvedGitHubClientConfig {
696 app_id,
697 installation_id,
698 api_base_url: args
699 .api_base_url
700 .clone()
701 .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
702 private_key,
703 })
704 }
705
706 fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
707 self.state
708 .ctx
709 .read()
710 .expect("github connector ctx poisoned")
711 .clone()
712 .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
713 }
714
715 async fn request_json(
716 &self,
717 config: &ResolvedGitHubClientConfig,
718 method: Method,
719 path: &str,
720 body: Option<JsonValue>,
721 accept: &str,
722 ) -> Result<JsonValue, ClientError> {
723 let url = absolute_api_url(&config.api_base_url, path)?;
724 let response = self
725 .request_response(config, method, url, body, accept)
726 .await?;
727 response
728 .json::<JsonValue>()
729 .await
730 .map_err(|error| ClientError::Transport(error.to_string()))
731 }
732
733 async fn request_text(
734 &self,
735 config: &ResolvedGitHubClientConfig,
736 method: Method,
737 path: &str,
738 accept: &str,
739 ) -> Result<String, ClientError> {
740 let url = absolute_api_url(&config.api_base_url, path)?;
741 let response = self
742 .request_response(config, method, url, None, accept)
743 .await?;
744 response
745 .text()
746 .await
747 .map_err(|error| ClientError::Transport(error.to_string()))
748 }
749
750 async fn request_response(
751 &self,
752 config: &ResolvedGitHubClientConfig,
753 method: Method,
754 url: String,
755 body: Option<JsonValue>,
756 accept: &str,
757 ) -> Result<Response, ClientError> {
758 let mut retried_401 = false;
759 let mut retried_rate_limit = false;
760 loop {
761 let token = self.installation_token(config).await?;
762 let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
763 let ctx = self.ctx()?;
764 ctx.rate_limiter
765 .scoped(
766 &self.provider_id,
767 format!("installation:{}", config.installation_id),
768 )
769 .acquire()
770 .await;
771 if let Some(error) = crate::egress::client_error_for_url("connector_call:github", &url)
772 {
773 return Err(error);
774 }
775
776 let mut request = self
777 .http
778 .request(method.clone(), &url)
779 .header("Accept", accept)
780 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
781 .header("Authorization", format!("Bearer {token_text}"));
782 if let Some(payload) = body.clone() {
783 request = request.json(&payload);
784 }
785 let response = request
786 .send()
787 .await
788 .map_err(|error| ClientError::Transport(error.to_string()))?;
789 self.record_rate_limit_observation(&response).await;
790
791 if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
792 self.tokens.invalidate(config.installation_id);
793 retried_401 = true;
794 continue;
795 }
796 if is_rate_limited(&response) && !retried_rate_limit {
797 tokio::time::sleep(rate_limit_backoff(&response)).await;
798 retried_rate_limit = true;
799 continue;
800 }
801 if !response.status().is_success() {
802 let status = response.status();
803 let body = response.text().await.unwrap_or_default();
804 let message = if body.trim().is_empty() {
805 format!("github API request failed with status {status}")
806 } else {
807 format!("github API request failed with status {status}: {body}")
808 };
809 return Err(if is_rate_limited_status(status) {
810 ClientError::RateLimited(message)
811 } else {
812 ClientError::Transport(message)
813 });
814 }
815 return Ok(response);
816 }
817 }
818
819 async fn installation_token(
820 &self,
821 config: &ResolvedGitHubClientConfig,
822 ) -> Result<SecretBytes, ClientError> {
823 let now = OffsetDateTime::now_utc();
824 if let Some(token) = self.tokens.get(config.installation_id, now) {
825 return Ok(token);
826 }
827 let jwt = self.mint_app_jwt(config).await?;
828 let url = absolute_api_url(
829 &config.api_base_url,
830 &format!(
831 "/app/installations/{}/access_tokens",
832 config.installation_id
833 ),
834 )?;
835 let ctx = self.ctx()?;
836 ctx.rate_limiter
837 .scoped(
838 &self.provider_id,
839 format!("installation:{}", config.installation_id),
840 )
841 .acquire()
842 .await;
843 if let Some(error) = crate::egress::client_error_for_url("connector_call:github", &url) {
844 return Err(error);
845 }
846 let response = self
847 .http
848 .post(url)
849 .header("Accept", "application/vnd.github+json")
850 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
851 .header("Authorization", format!("Bearer {jwt}"))
852 .send()
853 .await
854 .map_err(|error| ClientError::Transport(error.to_string()))?;
855 self.record_rate_limit_observation(&response).await;
856 if !response.status().is_success() {
857 let status = response.status();
858 let body = response.text().await.unwrap_or_default();
859 return Err(ClientError::Transport(format!(
860 "failed to create GitHub installation token ({status}): {body}"
861 )));
862 }
863 let payload: InstallationTokenResponse = response
864 .json()
865 .await
866 .map_err(|error| ClientError::Transport(error.to_string()))?;
867 let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
868 let token = SecretBytes::from(payload.token);
869 let result = token.reborrow();
870 self.tokens.store(config.installation_id, token, refresh_at);
871 Ok(result)
872 }
873
874 async fn mint_app_jwt(
875 &self,
876 config: &ResolvedGitHubClientConfig,
877 ) -> Result<String, ClientError> {
878 let pem = self.private_key_pem(config).await?;
879 let now = OffsetDateTime::now_utc();
880 let claims = GitHubJwtClaims {
881 iat: (now - Duration::seconds(60)).unix_timestamp(),
882 exp: (now + Duration::minutes(9)).unix_timestamp(),
883 iss: config.app_id.to_string(),
884 };
885 let header = Header::new(Algorithm::RS256);
886 let key = EncodingKey::from_rsa_pem(pem.as_bytes())
887 .map_err(|error| ClientError::Other(error.to_string()))?;
888 jsonwebtoken::encode(&header, &claims, &key)
889 .map_err(|error| ClientError::Other(error.to_string()))
890 }
891
892 async fn private_key_pem(
893 &self,
894 config: &ResolvedGitHubClientConfig,
895 ) -> Result<String, ClientError> {
896 match &config.private_key {
897 PrivateKeySource::Inline(pem) => Ok(pem.clone()),
898 PrivateKeySource::Secret(id) => {
899 let ctx = self.ctx()?;
900 let secret = ctx
901 .secrets
902 .get(id)
903 .await
904 .map_err(|error| ClientError::Other(error.to_string()))?;
905 secret.with_exposed(|bytes| {
906 std::str::from_utf8(bytes)
907 .map(|value| value.to_string())
908 .map_err(|error| ClientError::Other(error.to_string()))
909 })
910 }
911 }
912 }
913
914 async fn record_rate_limit_observation(&self, response: &Response) {
915 let remaining = response
916 .headers()
917 .get("x-ratelimit-remaining")
918 .and_then(|value| value.to_str().ok())
919 .and_then(|value| value.parse::<u64>().ok());
920 let reset = response
921 .headers()
922 .get("x-ratelimit-reset")
923 .and_then(|value| value.to_str().ok())
924 .and_then(|value| value.parse::<i64>().ok());
925 let Some(remaining) = remaining else {
926 return;
927 };
928 if remaining > 10 {
929 return;
930 }
931 let Ok(ctx) = self.ctx() else {
932 return;
933 };
934 let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
935 return;
936 };
937 let _ = ctx
938 .event_log
939 .append(
940 &topic,
941 LogEvent::new(
942 "github.rate_limit",
943 json!({
944 "remaining": remaining,
945 "reset": reset,
946 "status": response.status().as_u16(),
947 }),
948 ),
949 )
950 .await;
951 }
952}
953
954impl ActivatedGitHubBinding {
955 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
956 let config: GitHubBindingConfig =
957 serde_json::from_value(binding.config.clone()).map_err(|error| {
958 ConnectorError::Activation(format!(
959 "github binding `{}` has invalid config: {error}",
960 binding.binding_id
961 ))
962 })?;
963 let signing_secret =
964 parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
965 ConnectorError::Activation(format!(
966 "github binding `{}` requires secrets.signing_secret",
967 binding.binding_id
968 ))
969 })?;
970 Ok(Self {
971 binding_id: binding.binding_id.clone(),
972 path: config.match_config.path,
973 signing_secret,
974 dedupe_enabled: binding.dedupe_key.is_some(),
975 dedupe_ttl: std::time::Duration::from_secs(
976 u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
977 ),
978 })
979 }
980}
981
982impl GitHubInstallationTokenStore {
983 fn new(capacity: usize) -> Self {
984 Self {
985 capacity: capacity.max(1),
986 state: Mutex::new(TokenCacheState::default()),
987 }
988 }
989
990 fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
991 let mut state = self.state.lock().expect("github token cache poisoned");
992 let refresh_at = state
993 .entries
994 .get(&installation_id)
995 .map(|entry| entry.refresh_at)?;
996 if refresh_at <= now {
997 state.entries.remove(&installation_id);
998 state.order.retain(|id| *id != installation_id);
999 return None;
1000 }
1001 touch_lru(&mut state.order, installation_id);
1002 state
1003 .entries
1004 .get(&installation_id)
1005 .map(|entry| entry.token.reborrow())
1006 }
1007
1008 fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
1009 let mut state = self.state.lock().expect("github token cache poisoned");
1010 state.entries.insert(
1011 installation_id,
1012 InstallationTokenEntry { token, refresh_at },
1013 );
1014 touch_lru(&mut state.order, installation_id);
1015 while state.entries.len() > self.capacity {
1016 if let Some(evicted) = state.order.pop_front() {
1017 state.entries.remove(&evicted);
1018 }
1019 }
1020 }
1021
1022 fn invalidate(&self, installation_id: u64) {
1023 let mut state = self.state.lock().expect("github token cache poisoned");
1024 state.entries.remove(&installation_id);
1025 state.order.retain(|id| *id != installation_id);
1026 }
1027}
1028
1029fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
1030 order.retain(|id| *id != installation_id);
1031 order.push_back(installation_id);
1032}
1033
1034fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
1035 serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
1036}
1037
1038fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
1039 match kind {
1040 "issues" => serde_json::from_value(payload.clone())
1041 .map(ParsedGitHubEvent::Issues)
1042 .map_err(|error| ConnectorError::Json(error.to_string())),
1043 "pull_request" => serde_json::from_value(payload.clone())
1044 .map(ParsedGitHubEvent::PullRequest)
1045 .map_err(|error| ConnectorError::Json(error.to_string())),
1046 "issue_comment" => serde_json::from_value(payload.clone())
1047 .map(ParsedGitHubEvent::IssueComment)
1048 .map_err(|error| ConnectorError::Json(error.to_string())),
1049 "pull_request_review" => serde_json::from_value(payload.clone())
1050 .map(ParsedGitHubEvent::PullRequestReview)
1051 .map_err(|error| ConnectorError::Json(error.to_string())),
1052 "push" => serde_json::from_value(payload.clone())
1053 .map(ParsedGitHubEvent::Push)
1054 .map_err(|error| ConnectorError::Json(error.to_string())),
1055 "workflow_run" => serde_json::from_value(payload.clone())
1056 .map(ParsedGitHubEvent::WorkflowRun)
1057 .map_err(|error| ConnectorError::Json(error.to_string())),
1058 "deployment_status" => serde_json::from_value(payload.clone())
1059 .map(ParsedGitHubEvent::DeploymentStatus)
1060 .map_err(|error| ConnectorError::Json(error.to_string())),
1061 "check_run" => serde_json::from_value(payload.clone())
1062 .map(ParsedGitHubEvent::CheckRun)
1063 .map_err(|error| ConnectorError::Json(error.to_string())),
1064 other => Ok(ParsedGitHubEvent::Other {
1065 kind: other.to_string(),
1066 raw: payload.clone(),
1067 }),
1068 }
1069}
1070
1071fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1072 let mut effective = headers.clone();
1073 canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1074 canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1075 canonicalize_header(
1076 headers,
1077 &mut effective,
1078 "x-github-delivery",
1079 "X-GitHub-Delivery",
1080 );
1081 canonicalize_header(
1082 headers,
1083 &mut effective,
1084 "x-github-hook-id",
1085 "X-GitHub-Hook-Id",
1086 );
1087 canonicalize_header(
1088 headers,
1089 &mut effective,
1090 "x-hub-signature-256",
1091 "X-Hub-Signature-256",
1092 );
1093 effective
1094}
1095
1096fn canonicalize_header(
1097 source: &BTreeMap<String, String>,
1098 target: &mut BTreeMap<String, String>,
1099 lookup_name: &str,
1100 canonical_name: &str,
1101) {
1102 if let Some(value) = header_value(source, lookup_name) {
1103 target
1104 .entry(canonical_name.to_string())
1105 .or_insert_with(|| value.to_string());
1106 }
1107}
1108
1109fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1110 headers
1111 .iter()
1112 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1113 .map(|(_, value)| value.as_str())
1114}
1115
1116fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1117 let trimmed = raw?.trim();
1118 if trimmed.is_empty() {
1119 return None;
1120 }
1121 let (base, version) = match trimmed.rsplit_once('@') {
1122 Some((base, version_text)) => {
1123 let version = version_text.parse::<u64>().ok()?;
1124 (base, SecretVersion::Exact(version))
1125 }
1126 None => (trimmed, SecretVersion::Latest),
1127 };
1128 let (namespace, name) = base.split_once('/')?;
1129 if namespace.is_empty() || name.is_empty() {
1130 return None;
1131 }
1132 Some(SecretId::new(namespace, name).with_version(version))
1133}
1134
1135fn load_secret_text_blocking(
1136 ctx: &ConnectorCtx,
1137 secret_id: &SecretId,
1138) -> Result<String, ConnectorError> {
1139 let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1140 secret.with_exposed(|bytes| {
1141 std::str::from_utf8(bytes)
1142 .map(|value| value.to_string())
1143 .map_err(|error| {
1144 ConnectorError::Secret(format!(
1145 "github secret `{secret_id}` is not valid UTF-8: {error}"
1146 ))
1147 })
1148 })
1149}
1150
1151fn fallback_body_digest(body: &[u8]) -> String {
1152 let digest = Sha256::digest(body);
1153 format!("sha256:{}", hex::encode(digest))
1154}
1155
1156fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1157 let trimmed = input.trim().trim_matches('/');
1158 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1159 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1160 if parts.len() >= 2 {
1161 return Ok(RepoRef {
1162 owner: parts[0].to_string(),
1163 repo: parts[1].to_string(),
1164 });
1165 }
1166 }
1167 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1168 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1169 if parts.len() >= 2 {
1170 return Ok(RepoRef {
1171 owner: parts[0].to_string(),
1172 repo: parts[1].to_string(),
1173 });
1174 }
1175 }
1176 if let Some((owner, repo)) = trimmed.split_once('/') {
1177 if !owner.is_empty() && !repo.is_empty() {
1178 return Ok(RepoRef {
1179 owner: owner.to_string(),
1180 repo: repo.to_string(),
1181 });
1182 }
1183 }
1184 Err(ClientError::InvalidArgs(format!(
1185 "invalid GitHub repository reference `{input}`"
1186 )))
1187}
1188
1189fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1190 let trimmed = input.trim().trim_matches('/');
1191 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1192 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1193 if parts.len() >= 4 {
1194 let kind = parts[2];
1195 if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1196 return Ok(IssueRef {
1197 repo: RepoRef {
1198 owner: parts[0].to_string(),
1199 repo: parts[1].to_string(),
1200 },
1201 number: parts[3].parse().map_err(|error| {
1202 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1203 })?,
1204 });
1205 }
1206 }
1207 }
1208 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1209 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1210 if parts.len() >= 4 {
1211 let kind = parts[2];
1212 if kind == expected_kind
1213 || (expected_kind == "issue" && kind == "issues")
1214 || (expected_kind == "pull" && kind == "pulls")
1215 {
1216 return Ok(IssueRef {
1217 repo: RepoRef {
1218 owner: parts[0].to_string(),
1219 repo: parts[1].to_string(),
1220 },
1221 number: parts[3].parse().map_err(|error| {
1222 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1223 })?,
1224 });
1225 }
1226 }
1227 }
1228 Err(ClientError::InvalidArgs(format!(
1229 "invalid GitHub {expected_kind} URL `{input}`"
1230 )))
1231}
1232
1233fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1234 let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1235 base.join(path.trim_start_matches('/'))
1236 .map(|value| value.to_string())
1237 .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1238}
1239
1240fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1241 let eager_refresh = now + Duration::minutes(55);
1242 let Some(expires_at) = expires_at else {
1243 return eager_refresh;
1244 };
1245 let parsed =
1246 OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1247 parsed
1248 .map(|value| value - Duration::minutes(5))
1249 .map(|value| {
1250 if value < eager_refresh {
1251 value
1252 } else {
1253 eager_refresh
1254 }
1255 })
1256 .unwrap_or(eager_refresh)
1257}
1258
1259fn is_rate_limited(response: &Response) -> bool {
1260 if is_rate_limited_status(response.status()) {
1261 return true;
1262 }
1263 response
1264 .headers()
1265 .get("x-ratelimit-remaining")
1266 .and_then(|value| value.to_str().ok())
1267 .map(|value| value == "0")
1268 .unwrap_or(false)
1269}
1270
1271fn is_rate_limited_status(status: StatusCode) -> bool {
1272 status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1273}
1274
1275fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1276 if let Some(delay) = response
1277 .headers()
1278 .get("retry-after")
1279 .and_then(|value| value.to_str().ok())
1280 .and_then(|value| value.parse::<u64>().ok())
1281 {
1282 return std::time::Duration::from_secs(delay);
1283 }
1284 if let Some(reset_at) = response
1285 .headers()
1286 .get("x-ratelimit-reset")
1287 .and_then(|value| value.to_str().ok())
1288 .and_then(|value| value.parse::<i64>().ok())
1289 .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1290 {
1291 let now = OffsetDateTime::now_utc();
1292 if reset_at > now {
1293 let delta = reset_at - now;
1294 return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1295 }
1296 }
1297 std::time::Duration::from_millis(100)
1298}