1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15 HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21 TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33 provider_id: ProviderId,
34 kinds: Vec<TriggerKind>,
35 state: Arc<GitHubConnectorState>,
36 client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41 ctx: RwLock<Option<ConnectorCtx>>,
42 bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47 binding_id: String,
48 path: Option<String>,
49 signing_secret: SecretId,
50 dedupe_enabled: bool,
51 dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55 provider_id: ProviderId,
56 state: Arc<GitHubConnectorState>,
57 http: reqwest::Client,
58 tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62 capacity: usize,
63 state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68 entries: HashMap<u64, InstallationTokenEntry>,
69 order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73 token: SecretBytes,
74 refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79 app_id: u64,
80 installation_id: u64,
81 api_base_url: String,
82 private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87 Inline(String),
88 Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94 Issues(IssuesEvent),
95 PullRequest(PullRequestEvent),
96 IssueComment(IssueCommentEvent),
97 PullRequestReview(PullRequestReviewEvent),
98 Push(PushEvent),
99 WorkflowRun(WorkflowRunEvent),
100 Other { kind: String, raw: JsonValue },
101}
102
103#[allow(dead_code)]
104#[derive(Debug, Deserialize)]
105struct IssuesEvent {
106 action: Option<String>,
107 issue: JsonValue,
108}
109
110#[allow(dead_code)]
111#[derive(Debug, Deserialize)]
112struct PullRequestEvent {
113 action: Option<String>,
114 pull_request: JsonValue,
115}
116
117#[allow(dead_code)]
118#[derive(Debug, Deserialize)]
119struct IssueCommentEvent {
120 action: Option<String>,
121 comment: JsonValue,
122 issue: JsonValue,
123}
124
125#[allow(dead_code)]
126#[derive(Debug, Deserialize)]
127struct PullRequestReviewEvent {
128 action: Option<String>,
129 review: JsonValue,
130 pull_request: JsonValue,
131}
132
133#[allow(dead_code)]
134#[derive(Debug, Deserialize)]
135struct PushEvent {
136 #[serde(default)]
137 commits: Vec<JsonValue>,
138 #[serde(default)]
139 distinct_size: Option<u64>,
140}
141
142#[allow(dead_code)]
143#[derive(Debug, Deserialize)]
144struct WorkflowRunEvent {
145 action: Option<String>,
146 workflow_run: JsonValue,
147}
148
149#[derive(Debug, Default, Deserialize)]
150struct GitHubBindingConfig {
151 #[serde(default, rename = "match")]
152 match_config: GitHubMatchConfig,
153 #[serde(default)]
154 secrets: GitHubSecretsConfig,
155}
156
157#[derive(Debug, Default, Deserialize)]
158struct GitHubMatchConfig {
159 path: Option<String>,
160}
161
162#[derive(Debug, Default, Deserialize)]
163struct GitHubSecretsConfig {
164 signing_secret: Option<String>,
165 private_key: Option<String>,
166}
167
168#[derive(Debug, Default, Deserialize)]
169struct GitHubClientConfigArgs {
170 app_id: Option<u64>,
171 installation_id: Option<u64>,
172 api_base_url: Option<String>,
173 private_key_pem: Option<String>,
174 private_key_secret: Option<String>,
175 #[serde(default)]
176 secrets: GitHubSecretsConfig,
177}
178
179#[derive(Debug, Deserialize)]
180struct CommentArgs {
181 #[serde(flatten)]
182 config: GitHubClientConfigArgs,
183 issue_url: String,
184 body: String,
185}
186
187#[derive(Debug, Deserialize)]
188struct AddLabelsArgs {
189 #[serde(flatten)]
190 config: GitHubClientConfigArgs,
191 issue_url: String,
192 labels: Vec<String>,
193}
194
195#[derive(Debug, Deserialize)]
196struct RequestReviewArgs {
197 #[serde(flatten)]
198 config: GitHubClientConfigArgs,
199 pr_url: String,
200 reviewers: Vec<String>,
201}
202
203#[derive(Debug, Deserialize)]
204struct MergePrArgs {
205 #[serde(flatten)]
206 config: GitHubClientConfigArgs,
207 pr_url: String,
208 #[serde(default)]
209 commit_title: Option<String>,
210 #[serde(default)]
211 commit_message: Option<String>,
212 #[serde(default)]
213 merge_method: Option<String>,
214 #[serde(default)]
215 sha: Option<String>,
216 #[serde(default)]
217 admin_override: bool,
218}
219
220#[derive(Debug, Deserialize)]
221struct ListStalePrsArgs {
222 #[serde(flatten)]
223 config: GitHubClientConfigArgs,
224 repo: String,
225 days: i64,
226}
227
228#[derive(Debug, Deserialize)]
229struct GetPrDiffArgs {
230 #[serde(flatten)]
231 config: GitHubClientConfigArgs,
232 pr_url: String,
233}
234
235#[derive(Debug, Deserialize)]
236struct CreateIssueArgs {
237 #[serde(flatten)]
238 config: GitHubClientConfigArgs,
239 repo: String,
240 title: String,
241 #[serde(default)]
242 body: Option<String>,
243 #[serde(default)]
244 labels: Option<Vec<String>>,
245}
246
247#[derive(Debug, Deserialize)]
248struct InstallationTokenResponse {
249 token: String,
250 #[serde(default)]
251 expires_at: Option<String>,
252}
253
254#[derive(Debug, Serialize)]
255struct GitHubJwtClaims {
256 iat: i64,
257 exp: i64,
258 iss: String,
259}
260
261#[derive(Clone, Debug)]
262struct RepoRef {
263 owner: String,
264 repo: String,
265}
266
267#[derive(Clone, Debug)]
268struct IssueRef {
269 repo: RepoRef,
270 number: u64,
271}
272
273impl GitHubConnector {
274 pub fn new() -> Self {
275 let state = Arc::new(GitHubConnectorState::default());
276 let client = Arc::new(GitHubClient {
277 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
278 state: state.clone(),
279 http: reqwest::Client::builder()
280 .user_agent("harn-github-connector")
281 .build()
282 .unwrap_or_else(|_| reqwest::Client::new()),
283 tokens: GitHubInstallationTokenStore::new(32),
284 });
285 Self {
286 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
287 kinds: vec![TriggerKind::from("webhook")],
288 state,
289 client,
290 }
291 }
292
293 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
294 let bindings = self
295 .state
296 .bindings
297 .read()
298 .expect("github connector bindings poisoned");
299 if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
300 return bindings.get(binding_id).cloned().ok_or_else(|| {
301 ConnectorError::Unsupported(format!(
302 "github connector has no active binding `{binding_id}`"
303 ))
304 });
305 }
306 if bindings.len() == 1 {
307 return bindings
308 .values()
309 .next()
310 .cloned()
311 .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
312 }
313 Err(ConnectorError::Unsupported(
314 "github connector requires raw.metadata.binding_id when multiple bindings are active"
315 .to_string(),
316 ))
317 }
318
319 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
320 self.state
321 .ctx
322 .read()
323 .expect("github connector ctx poisoned")
324 .clone()
325 .ok_or_else(|| {
326 ConnectorError::Activation(
327 "github connector must be initialized before use".to_string(),
328 )
329 })
330 }
331}
332
333impl Default for GitHubConnector {
334 fn default() -> Self {
335 Self::new()
336 }
337}
338
339#[async_trait]
340impl Connector for GitHubConnector {
341 fn provider_id(&self) -> &ProviderId {
342 &self.provider_id
343 }
344
345 fn kinds(&self) -> &[TriggerKind] {
346 &self.kinds
347 }
348
349 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
350 *self
351 .state
352 .ctx
353 .write()
354 .expect("github connector ctx poisoned") = Some(ctx);
355 Ok(())
356 }
357
358 async fn activate(
359 &self,
360 bindings: &[TriggerBinding],
361 ) -> Result<ActivationHandle, ConnectorError> {
362 let mut configured = HashMap::new();
363 let mut paths = BTreeSet::new();
364 for binding in bindings {
365 let activated = ActivatedGitHubBinding::from_binding(binding)?;
366 if let Some(path) = &activated.path {
367 if !paths.insert(path.clone()) {
368 return Err(ConnectorError::Activation(format!(
369 "github connector path `{path}` is configured by multiple bindings"
370 )));
371 }
372 }
373 configured.insert(binding.binding_id.clone(), activated);
374 }
375 *self
376 .state
377 .bindings
378 .write()
379 .expect("github connector bindings poisoned") = configured;
380 Ok(ActivationHandle::new(
381 self.provider_id.clone(),
382 bindings.len(),
383 ))
384 }
385
386 fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
387 let ctx = self.ctx()?;
388 let binding = self.binding_for_raw(&raw)?;
389 let provider = self.provider_id.clone();
390 let received_at = raw.received_at;
391 let headers = effective_headers(&raw.headers);
392 let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
393 futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
394 ctx.event_log.as_ref(),
395 &provider,
396 HmacSignatureStyle::github(),
397 &raw.body,
398 &headers,
399 secret.as_str(),
400 None,
401 received_at,
402 ))?;
403
404 let payload = raw.json_body()?;
405 let event_kind = header_value(&headers, "x-github-event")
406 .map(ToString::to_string)
407 .or_else(|| {
408 if raw.kind.trim().is_empty() {
409 None
410 } else {
411 Some(raw.kind.clone())
412 }
413 })
414 .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
415 let _typed = parse_typed_event(&event_kind, &payload)?;
416 let dedupe_key = header_value(&headers, "x-github-delivery")
417 .map(ToString::to_string)
418 .unwrap_or_else(|| fallback_body_digest(&raw.body));
419 if binding.dedupe_enabled {
420 let inserted = futures::executor::block_on(ctx.inbox.insert_if_new(
421 &binding.binding_id,
422 &dedupe_key,
423 binding.dedupe_ttl,
424 ))?;
425 if !inserted {
426 return Err(ConnectorError::DuplicateDelivery(format!(
427 "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
428 binding.binding_id
429 )));
430 }
431 }
432
433 let provider_payload =
434 ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
435 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
436 Ok(TriggerEvent {
437 id: TriggerEventId::new(),
438 provider,
439 kind: event_kind,
440 received_at,
441 occurred_at: raw.occurred_at,
442 dedupe_key,
443 trace_id: TraceId::new(),
444 tenant_id: raw.tenant_id.clone(),
445 headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
446 batch: None,
447 provider_payload,
448 signature_status: SignatureStatus::Verified,
449 dedupe_claimed: false,
450 })
451 }
452
453 fn payload_schema(&self) -> ProviderPayloadSchema {
454 ProviderPayloadSchema::named("GitHubEventPayload")
455 }
456
457 fn client(&self) -> Arc<dyn ConnectorClient> {
458 self.client.clone()
459 }
460}
461
462#[async_trait]
463impl ConnectorClient for GitHubClient {
464 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
465 match method {
466 "comment" => {
467 let args: CommentArgs = parse_args(args)?;
468 let config = self.resolve_client_config(&args.config)?;
469 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
470 self.request_json(
471 &config,
472 Method::POST,
473 &format!(
474 "/repos/{}/{}/issues/{}/comments",
475 issue.repo.owner, issue.repo.repo, issue.number
476 ),
477 Some(json!({ "body": args.body })),
478 "application/vnd.github+json",
479 )
480 .await
481 }
482 "add_labels" => {
483 let args: AddLabelsArgs = parse_args(args)?;
484 let config = self.resolve_client_config(&args.config)?;
485 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
486 self.request_json(
487 &config,
488 Method::POST,
489 &format!(
490 "/repos/{}/{}/issues/{}/labels",
491 issue.repo.owner, issue.repo.repo, issue.number
492 ),
493 Some(json!({ "labels": args.labels })),
494 "application/vnd.github+json",
495 )
496 .await
497 }
498 "request_review" => {
499 let args: RequestReviewArgs = parse_args(args)?;
500 let config = self.resolve_client_config(&args.config)?;
501 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
502 self.request_json(
503 &config,
504 Method::POST,
505 &format!(
506 "/repos/{}/{}/pulls/{}/requested_reviewers",
507 pr.repo.owner, pr.repo.repo, pr.number
508 ),
509 Some(json!({ "reviewers": args.reviewers })),
510 "application/vnd.github+json",
511 )
512 .await
513 }
514 "merge_pr" => {
515 let args: MergePrArgs = parse_args(args)?;
516 let config = self.resolve_client_config(&args.config)?;
517 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
518 let mut body = JsonMap::new();
519 if let Some(value) = args.commit_title {
520 body.insert("commit_title".to_string(), JsonValue::String(value));
521 }
522 if let Some(value) = args.commit_message {
523 body.insert("commit_message".to_string(), JsonValue::String(value));
524 }
525 if let Some(value) = args.merge_method {
526 body.insert("merge_method".to_string(), JsonValue::String(value));
527 }
528 if let Some(value) = args.sha {
529 body.insert("sha".to_string(), JsonValue::String(value));
530 }
531 let mut response = self
532 .request_json(
533 &config,
534 Method::PUT,
535 &format!(
536 "/repos/{}/{}/pulls/{}/merge",
537 pr.repo.owner, pr.repo.repo, pr.number
538 ),
539 Some(JsonValue::Object(body)),
540 "application/vnd.github+json",
541 )
542 .await?;
543 if args.admin_override {
544 if let Some(map) = response.as_object_mut() {
545 map.insert(
546 "admin_override_requested".to_string(),
547 JsonValue::Bool(true),
548 );
549 }
550 }
551 Ok(response)
552 }
553 "list_stale_prs" => {
554 let args: ListStalePrsArgs = parse_args(args)?;
555 let config = self.resolve_client_config(&args.config)?;
556 let repo = parse_repo_ref(&args.repo)?;
557 let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
558 .date()
559 .to_string();
560 let query = format!(
561 "repo:{}/{} is:pr is:open updated:<{}",
562 repo.owner, repo.repo, stale_before
563 );
564 let url = Url::parse_with_params(
565 &absolute_api_url(&config.api_base_url, "/search/issues")?,
566 &[("q", query)],
567 )
568 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
569 let response = self
570 .request_response(
571 &config,
572 Method::GET,
573 url.to_string(),
574 None,
575 "application/vnd.github+json",
576 )
577 .await?;
578 response
579 .json::<JsonValue>()
580 .await
581 .map_err(|error| ClientError::Transport(error.to_string()))
582 }
583 "get_pr_diff" => {
584 let args: GetPrDiffArgs = parse_args(args)?;
585 let config = self.resolve_client_config(&args.config)?;
586 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
587 let text = self
588 .request_text(
589 &config,
590 Method::GET,
591 &format!(
592 "/repos/{}/{}/pulls/{}",
593 pr.repo.owner, pr.repo.repo, pr.number
594 ),
595 "application/vnd.github.diff",
596 )
597 .await?;
598 Ok(JsonValue::String(text))
599 }
600 "create_issue" => {
601 let args: CreateIssueArgs = parse_args(args)?;
602 let config = self.resolve_client_config(&args.config)?;
603 let repo = parse_repo_ref(&args.repo)?;
604 let mut body = JsonMap::new();
605 body.insert("title".to_string(), JsonValue::String(args.title));
606 if let Some(value) = args.body {
607 body.insert("body".to_string(), JsonValue::String(value));
608 }
609 if let Some(labels) = args.labels {
610 body.insert("labels".to_string(), json!(labels));
611 }
612 self.request_json(
613 &config,
614 Method::POST,
615 &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
616 Some(JsonValue::Object(body)),
617 "application/vnd.github+json",
618 )
619 .await
620 }
621 other => Err(ClientError::MethodNotFound(format!(
622 "github connector does not implement outbound method `{other}`"
623 ))),
624 }
625 }
626}
627
628impl GitHubClient {
629 fn resolve_client_config(
630 &self,
631 args: &GitHubClientConfigArgs,
632 ) -> Result<ResolvedGitHubClientConfig, ClientError> {
633 let app_id = args.app_id.ok_or_else(|| {
634 ClientError::InvalidArgs("github connector requires app_id".to_string())
635 })?;
636 let installation_id = args.installation_id.ok_or_else(|| {
637 ClientError::InvalidArgs("github connector requires installation_id".to_string())
638 })?;
639 let private_key = if let Some(secret_id) = args
640 .private_key_secret
641 .as_deref()
642 .or(args.secrets.private_key.as_deref())
643 .and_then(|value| parse_secret_id(Some(value)))
644 {
645 PrivateKeySource::Secret(secret_id)
646 } else if let Some(pem) = args.private_key_pem.clone() {
647 PrivateKeySource::Inline(pem)
648 } else {
649 return Err(ClientError::InvalidArgs(
650 "github connector requires private_key_secret or private_key_pem".to_string(),
651 ));
652 };
653 Ok(ResolvedGitHubClientConfig {
654 app_id,
655 installation_id,
656 api_base_url: args
657 .api_base_url
658 .clone()
659 .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
660 private_key,
661 })
662 }
663
664 fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
665 self.state
666 .ctx
667 .read()
668 .expect("github connector ctx poisoned")
669 .clone()
670 .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
671 }
672
673 async fn request_json(
674 &self,
675 config: &ResolvedGitHubClientConfig,
676 method: Method,
677 path: &str,
678 body: Option<JsonValue>,
679 accept: &str,
680 ) -> Result<JsonValue, ClientError> {
681 let url = absolute_api_url(&config.api_base_url, path)?;
682 let response = self
683 .request_response(config, method, url, body, accept)
684 .await?;
685 response
686 .json::<JsonValue>()
687 .await
688 .map_err(|error| ClientError::Transport(error.to_string()))
689 }
690
691 async fn request_text(
692 &self,
693 config: &ResolvedGitHubClientConfig,
694 method: Method,
695 path: &str,
696 accept: &str,
697 ) -> Result<String, ClientError> {
698 let url = absolute_api_url(&config.api_base_url, path)?;
699 let response = self
700 .request_response(config, method, url, None, accept)
701 .await?;
702 response
703 .text()
704 .await
705 .map_err(|error| ClientError::Transport(error.to_string()))
706 }
707
708 async fn request_response(
709 &self,
710 config: &ResolvedGitHubClientConfig,
711 method: Method,
712 url: String,
713 body: Option<JsonValue>,
714 accept: &str,
715 ) -> Result<Response, ClientError> {
716 let mut retried_401 = false;
717 let mut retried_rate_limit = false;
718 loop {
719 let token = self.installation_token(config).await?;
720 let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
721 let ctx = self.ctx()?;
722 ctx.rate_limiter
723 .scoped(
724 &self.provider_id,
725 format!("installation:{}", config.installation_id),
726 )
727 .acquire()
728 .await;
729
730 let mut request = self
731 .http
732 .request(method.clone(), &url)
733 .header("Accept", accept)
734 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
735 .header("Authorization", format!("Bearer {token_text}"));
736 if let Some(payload) = body.clone() {
737 request = request.json(&payload);
738 }
739 let response = request
740 .send()
741 .await
742 .map_err(|error| ClientError::Transport(error.to_string()))?;
743 self.record_rate_limit_observation(&response).await;
744
745 if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
746 self.tokens.invalidate(config.installation_id);
747 retried_401 = true;
748 continue;
749 }
750 if is_rate_limited(&response) && !retried_rate_limit {
751 tokio::time::sleep(rate_limit_backoff(&response)).await;
752 retried_rate_limit = true;
753 continue;
754 }
755 if !response.status().is_success() {
756 let status = response.status();
757 let body = response.text().await.unwrap_or_default();
758 let message = if body.trim().is_empty() {
759 format!("github API request failed with status {status}")
760 } else {
761 format!("github API request failed with status {status}: {body}")
762 };
763 return Err(if is_rate_limited_status(status) {
764 ClientError::RateLimited(message)
765 } else {
766 ClientError::Transport(message)
767 });
768 }
769 return Ok(response);
770 }
771 }
772
773 async fn installation_token(
774 &self,
775 config: &ResolvedGitHubClientConfig,
776 ) -> Result<SecretBytes, ClientError> {
777 let now = OffsetDateTime::now_utc();
778 if let Some(token) = self.tokens.get(config.installation_id, now) {
779 return Ok(token);
780 }
781 let jwt = self.mint_app_jwt(config).await?;
782 let url = absolute_api_url(
783 &config.api_base_url,
784 &format!(
785 "/app/installations/{}/access_tokens",
786 config.installation_id
787 ),
788 )?;
789 let ctx = self.ctx()?;
790 ctx.rate_limiter
791 .scoped(
792 &self.provider_id,
793 format!("installation:{}", config.installation_id),
794 )
795 .acquire()
796 .await;
797 let response = self
798 .http
799 .post(url)
800 .header("Accept", "application/vnd.github+json")
801 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
802 .header("Authorization", format!("Bearer {jwt}"))
803 .send()
804 .await
805 .map_err(|error| ClientError::Transport(error.to_string()))?;
806 self.record_rate_limit_observation(&response).await;
807 if !response.status().is_success() {
808 let status = response.status();
809 let body = response.text().await.unwrap_or_default();
810 return Err(ClientError::Transport(format!(
811 "failed to create GitHub installation token ({status}): {body}"
812 )));
813 }
814 let payload: InstallationTokenResponse = response
815 .json()
816 .await
817 .map_err(|error| ClientError::Transport(error.to_string()))?;
818 let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
819 let token = SecretBytes::from(payload.token);
820 let result = token.reborrow();
821 self.tokens.store(config.installation_id, token, refresh_at);
822 Ok(result)
823 }
824
825 async fn mint_app_jwt(
826 &self,
827 config: &ResolvedGitHubClientConfig,
828 ) -> Result<String, ClientError> {
829 let pem = self.private_key_pem(config).await?;
830 let now = OffsetDateTime::now_utc();
831 let claims = GitHubJwtClaims {
832 iat: (now - Duration::seconds(60)).unix_timestamp(),
833 exp: (now + Duration::minutes(9)).unix_timestamp(),
834 iss: config.app_id.to_string(),
835 };
836 let header = Header::new(Algorithm::RS256);
837 let key = EncodingKey::from_rsa_pem(pem.as_bytes())
838 .map_err(|error| ClientError::Other(error.to_string()))?;
839 jsonwebtoken::encode(&header, &claims, &key)
840 .map_err(|error| ClientError::Other(error.to_string()))
841 }
842
843 async fn private_key_pem(
844 &self,
845 config: &ResolvedGitHubClientConfig,
846 ) -> Result<String, ClientError> {
847 match &config.private_key {
848 PrivateKeySource::Inline(pem) => Ok(pem.clone()),
849 PrivateKeySource::Secret(id) => {
850 let ctx = self.ctx()?;
851 let secret = ctx
852 .secrets
853 .get(id)
854 .await
855 .map_err(|error| ClientError::Other(error.to_string()))?;
856 secret.with_exposed(|bytes| {
857 std::str::from_utf8(bytes)
858 .map(|value| value.to_string())
859 .map_err(|error| ClientError::Other(error.to_string()))
860 })
861 }
862 }
863 }
864
865 async fn record_rate_limit_observation(&self, response: &Response) {
866 let remaining = response
867 .headers()
868 .get("x-ratelimit-remaining")
869 .and_then(|value| value.to_str().ok())
870 .and_then(|value| value.parse::<u64>().ok());
871 let reset = response
872 .headers()
873 .get("x-ratelimit-reset")
874 .and_then(|value| value.to_str().ok())
875 .and_then(|value| value.parse::<i64>().ok());
876 let Some(remaining) = remaining else {
877 return;
878 };
879 if remaining > 10 {
880 return;
881 }
882 let Ok(ctx) = self.ctx() else {
883 return;
884 };
885 let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
886 return;
887 };
888 let _ = ctx
889 .event_log
890 .append(
891 &topic,
892 LogEvent::new(
893 "github.rate_limit",
894 json!({
895 "remaining": remaining,
896 "reset": reset,
897 "status": response.status().as_u16(),
898 }),
899 ),
900 )
901 .await;
902 }
903}
904
905impl ActivatedGitHubBinding {
906 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
907 let config: GitHubBindingConfig =
908 serde_json::from_value(binding.config.clone()).map_err(|error| {
909 ConnectorError::Activation(format!(
910 "github binding `{}` has invalid config: {error}",
911 binding.binding_id
912 ))
913 })?;
914 let signing_secret =
915 parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
916 ConnectorError::Activation(format!(
917 "github binding `{}` requires secrets.signing_secret",
918 binding.binding_id
919 ))
920 })?;
921 Ok(Self {
922 binding_id: binding.binding_id.clone(),
923 path: config.match_config.path,
924 signing_secret,
925 dedupe_enabled: binding.dedupe_key.is_some(),
926 dedupe_ttl: std::time::Duration::from_secs(
927 u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
928 ),
929 })
930 }
931}
932
933impl GitHubInstallationTokenStore {
934 fn new(capacity: usize) -> Self {
935 Self {
936 capacity: capacity.max(1),
937 state: Mutex::new(TokenCacheState::default()),
938 }
939 }
940
941 fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
942 let mut state = self.state.lock().expect("github token cache poisoned");
943 let refresh_at = state
944 .entries
945 .get(&installation_id)
946 .map(|entry| entry.refresh_at)?;
947 if refresh_at <= now {
948 state.entries.remove(&installation_id);
949 state.order.retain(|id| *id != installation_id);
950 return None;
951 }
952 touch_lru(&mut state.order, installation_id);
953 state
954 .entries
955 .get(&installation_id)
956 .map(|entry| entry.token.reborrow())
957 }
958
959 fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
960 let mut state = self.state.lock().expect("github token cache poisoned");
961 state.entries.insert(
962 installation_id,
963 InstallationTokenEntry { token, refresh_at },
964 );
965 touch_lru(&mut state.order, installation_id);
966 while state.entries.len() > self.capacity {
967 if let Some(evicted) = state.order.pop_front() {
968 state.entries.remove(&evicted);
969 }
970 }
971 }
972
973 fn invalidate(&self, installation_id: u64) {
974 let mut state = self.state.lock().expect("github token cache poisoned");
975 state.entries.remove(&installation_id);
976 state.order.retain(|id| *id != installation_id);
977 }
978}
979
980fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
981 order.retain(|id| *id != installation_id);
982 order.push_back(installation_id);
983}
984
985fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
986 serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
987}
988
989fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
990 match kind {
991 "issues" => serde_json::from_value(payload.clone())
992 .map(ParsedGitHubEvent::Issues)
993 .map_err(|error| ConnectorError::Json(error.to_string())),
994 "pull_request" => serde_json::from_value(payload.clone())
995 .map(ParsedGitHubEvent::PullRequest)
996 .map_err(|error| ConnectorError::Json(error.to_string())),
997 "issue_comment" => serde_json::from_value(payload.clone())
998 .map(ParsedGitHubEvent::IssueComment)
999 .map_err(|error| ConnectorError::Json(error.to_string())),
1000 "pull_request_review" => serde_json::from_value(payload.clone())
1001 .map(ParsedGitHubEvent::PullRequestReview)
1002 .map_err(|error| ConnectorError::Json(error.to_string())),
1003 "push" => serde_json::from_value(payload.clone())
1004 .map(ParsedGitHubEvent::Push)
1005 .map_err(|error| ConnectorError::Json(error.to_string())),
1006 "workflow_run" => serde_json::from_value(payload.clone())
1007 .map(ParsedGitHubEvent::WorkflowRun)
1008 .map_err(|error| ConnectorError::Json(error.to_string())),
1009 other => Ok(ParsedGitHubEvent::Other {
1010 kind: other.to_string(),
1011 raw: payload.clone(),
1012 }),
1013 }
1014}
1015
1016fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1017 let mut effective = headers.clone();
1018 canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1019 canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1020 canonicalize_header(
1021 headers,
1022 &mut effective,
1023 "x-github-delivery",
1024 "X-GitHub-Delivery",
1025 );
1026 canonicalize_header(
1027 headers,
1028 &mut effective,
1029 "x-github-hook-id",
1030 "X-GitHub-Hook-Id",
1031 );
1032 canonicalize_header(
1033 headers,
1034 &mut effective,
1035 "x-hub-signature-256",
1036 "X-Hub-Signature-256",
1037 );
1038 effective
1039}
1040
1041fn canonicalize_header(
1042 source: &BTreeMap<String, String>,
1043 target: &mut BTreeMap<String, String>,
1044 lookup_name: &str,
1045 canonical_name: &str,
1046) {
1047 if let Some(value) = header_value(source, lookup_name) {
1048 target
1049 .entry(canonical_name.to_string())
1050 .or_insert_with(|| value.to_string());
1051 }
1052}
1053
1054fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1055 headers
1056 .iter()
1057 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1058 .map(|(_, value)| value.as_str())
1059}
1060
1061fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1062 let trimmed = raw?.trim();
1063 if trimmed.is_empty() {
1064 return None;
1065 }
1066 let (base, version) = match trimmed.rsplit_once('@') {
1067 Some((base, version_text)) => {
1068 let version = version_text.parse::<u64>().ok()?;
1069 (base, SecretVersion::Exact(version))
1070 }
1071 None => (trimmed, SecretVersion::Latest),
1072 };
1073 let (namespace, name) = base.split_once('/')?;
1074 if namespace.is_empty() || name.is_empty() {
1075 return None;
1076 }
1077 Some(SecretId::new(namespace, name).with_version(version))
1078}
1079
1080fn load_secret_text_blocking(
1081 ctx: &ConnectorCtx,
1082 secret_id: &SecretId,
1083) -> Result<String, ConnectorError> {
1084 let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1085 secret.with_exposed(|bytes| {
1086 std::str::from_utf8(bytes)
1087 .map(|value| value.to_string())
1088 .map_err(|error| {
1089 ConnectorError::Secret(format!(
1090 "github secret `{secret_id}` is not valid UTF-8: {error}"
1091 ))
1092 })
1093 })
1094}
1095
1096fn fallback_body_digest(body: &[u8]) -> String {
1097 let digest = Sha256::digest(body);
1098 format!("sha256:{}", hex::encode(digest))
1099}
1100
1101fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1102 let trimmed = input.trim().trim_matches('/');
1103 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1104 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1105 if parts.len() >= 2 {
1106 return Ok(RepoRef {
1107 owner: parts[0].to_string(),
1108 repo: parts[1].to_string(),
1109 });
1110 }
1111 }
1112 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1113 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1114 if parts.len() >= 2 {
1115 return Ok(RepoRef {
1116 owner: parts[0].to_string(),
1117 repo: parts[1].to_string(),
1118 });
1119 }
1120 }
1121 if let Some((owner, repo)) = trimmed.split_once('/') {
1122 if !owner.is_empty() && !repo.is_empty() {
1123 return Ok(RepoRef {
1124 owner: owner.to_string(),
1125 repo: repo.to_string(),
1126 });
1127 }
1128 }
1129 Err(ClientError::InvalidArgs(format!(
1130 "invalid GitHub repository reference `{input}`"
1131 )))
1132}
1133
1134fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1135 let trimmed = input.trim().trim_matches('/');
1136 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1137 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1138 if parts.len() >= 4 {
1139 let kind = parts[2];
1140 if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1141 return Ok(IssueRef {
1142 repo: RepoRef {
1143 owner: parts[0].to_string(),
1144 repo: parts[1].to_string(),
1145 },
1146 number: parts[3].parse().map_err(|error| {
1147 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1148 })?,
1149 });
1150 }
1151 }
1152 }
1153 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1154 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1155 if parts.len() >= 4 {
1156 let kind = parts[2];
1157 if kind == expected_kind
1158 || (expected_kind == "issue" && kind == "issues")
1159 || (expected_kind == "pull" && kind == "pulls")
1160 {
1161 return Ok(IssueRef {
1162 repo: RepoRef {
1163 owner: parts[0].to_string(),
1164 repo: parts[1].to_string(),
1165 },
1166 number: parts[3].parse().map_err(|error| {
1167 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1168 })?,
1169 });
1170 }
1171 }
1172 }
1173 Err(ClientError::InvalidArgs(format!(
1174 "invalid GitHub {expected_kind} URL `{input}`"
1175 )))
1176}
1177
1178fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1179 let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1180 base.join(path.trim_start_matches('/'))
1181 .map(|value| value.to_string())
1182 .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1183}
1184
1185fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1186 let eager_refresh = now + Duration::minutes(55);
1187 let Some(expires_at) = expires_at else {
1188 return eager_refresh;
1189 };
1190 let parsed =
1191 OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1192 parsed
1193 .map(|value| value - Duration::minutes(5))
1194 .map(|value| {
1195 if value < eager_refresh {
1196 value
1197 } else {
1198 eager_refresh
1199 }
1200 })
1201 .unwrap_or(eager_refresh)
1202}
1203
1204fn is_rate_limited(response: &Response) -> bool {
1205 if is_rate_limited_status(response.status()) {
1206 return true;
1207 }
1208 response
1209 .headers()
1210 .get("x-ratelimit-remaining")
1211 .and_then(|value| value.to_str().ok())
1212 .map(|value| value == "0")
1213 .unwrap_or(false)
1214}
1215
1216fn is_rate_limited_status(status: StatusCode) -> bool {
1217 status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1218}
1219
1220fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1221 if let Some(delay) = response
1222 .headers()
1223 .get("retry-after")
1224 .and_then(|value| value.to_str().ok())
1225 .and_then(|value| value.parse::<u64>().ok())
1226 {
1227 return std::time::Duration::from_secs(delay);
1228 }
1229 if let Some(reset_at) = response
1230 .headers()
1231 .get("x-ratelimit-reset")
1232 .and_then(|value| value.to_str().ok())
1233 .and_then(|value| value.parse::<i64>().ok())
1234 .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1235 {
1236 let now = OffsetDateTime::now_utc();
1237 if reset_at > now {
1238 let delta = reset_at - now;
1239 return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1240 }
1241 }
1242 std::time::Duration::from_millis(100)
1243}