1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15 HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21 TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33 provider_id: ProviderId,
34 kinds: Vec<TriggerKind>,
35 state: Arc<GitHubConnectorState>,
36 client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41 ctx: RwLock<Option<ConnectorCtx>>,
42 bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47 binding_id: String,
48 path: Option<String>,
49 signing_secret: SecretId,
50 dedupe_enabled: bool,
51 dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55 provider_id: ProviderId,
56 state: Arc<GitHubConnectorState>,
57 http: reqwest::Client,
58 tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62 capacity: usize,
63 state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68 entries: HashMap<u64, InstallationTokenEntry>,
69 order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73 token: SecretBytes,
74 refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79 app_id: u64,
80 installation_id: u64,
81 api_base_url: String,
82 private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87 Inline(String),
88 Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94 Issues(IssuesEvent),
95 PullRequest(PullRequestEvent),
96 IssueComment(IssueCommentEvent),
97 PullRequestReview(PullRequestReviewEvent),
98 Push(PushEvent),
99 WorkflowRun(WorkflowRunEvent),
100 DeploymentStatus(DeploymentStatusEvent),
101 CheckRun(CheckRunEvent),
102 Other { kind: String, raw: JsonValue },
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Deserialize)]
107struct IssuesEvent {
108 action: Option<String>,
109 issue: JsonValue,
110}
111
112#[allow(dead_code)]
113#[derive(Debug, Deserialize)]
114struct PullRequestEvent {
115 action: Option<String>,
116 pull_request: JsonValue,
117}
118
119#[allow(dead_code)]
120#[derive(Debug, Deserialize)]
121struct IssueCommentEvent {
122 action: Option<String>,
123 comment: JsonValue,
124 issue: JsonValue,
125}
126
127#[allow(dead_code)]
128#[derive(Debug, Deserialize)]
129struct PullRequestReviewEvent {
130 action: Option<String>,
131 review: JsonValue,
132 pull_request: JsonValue,
133}
134
135#[allow(dead_code)]
136#[derive(Debug, Deserialize)]
137struct PushEvent {
138 #[serde(default)]
139 commits: Vec<JsonValue>,
140 #[serde(default)]
141 distinct_size: Option<u64>,
142}
143
144#[allow(dead_code)]
145#[derive(Debug, Deserialize)]
146struct WorkflowRunEvent {
147 action: Option<String>,
148 workflow_run: JsonValue,
149}
150
151#[allow(dead_code)]
152#[derive(Debug, Deserialize)]
153struct DeploymentStatusEvent {
154 action: Option<String>,
155 deployment_status: JsonValue,
156 deployment: JsonValue,
157}
158
159#[allow(dead_code)]
160#[derive(Debug, Deserialize)]
161struct CheckRunEvent {
162 action: Option<String>,
163 check_run: JsonValue,
164}
165
166#[derive(Debug, Default, Deserialize)]
167struct GitHubBindingConfig {
168 #[serde(default, rename = "match")]
169 match_config: GitHubMatchConfig,
170 #[serde(default)]
171 secrets: GitHubSecretsConfig,
172}
173
174#[derive(Debug, Default, Deserialize)]
175struct GitHubMatchConfig {
176 path: Option<String>,
177}
178
179#[derive(Debug, Default, Deserialize)]
180struct GitHubSecretsConfig {
181 signing_secret: Option<String>,
182 private_key: Option<String>,
183}
184
185#[derive(Debug, Default, Deserialize)]
186struct GitHubClientConfigArgs {
187 app_id: Option<u64>,
188 installation_id: Option<u64>,
189 api_base_url: Option<String>,
190 private_key_pem: Option<String>,
191 private_key_secret: Option<String>,
192 #[serde(default)]
193 secrets: GitHubSecretsConfig,
194}
195
196#[derive(Debug, Deserialize)]
197struct CommentArgs {
198 #[serde(flatten)]
199 config: GitHubClientConfigArgs,
200 issue_url: String,
201 body: String,
202}
203
204#[derive(Debug, Deserialize)]
205struct AddLabelsArgs {
206 #[serde(flatten)]
207 config: GitHubClientConfigArgs,
208 issue_url: String,
209 labels: Vec<String>,
210}
211
212#[derive(Debug, Deserialize)]
213struct RequestReviewArgs {
214 #[serde(flatten)]
215 config: GitHubClientConfigArgs,
216 pr_url: String,
217 reviewers: Vec<String>,
218}
219
220#[derive(Debug, Deserialize)]
221struct MergePrArgs {
222 #[serde(flatten)]
223 config: GitHubClientConfigArgs,
224 pr_url: String,
225 #[serde(default)]
226 commit_title: Option<String>,
227 #[serde(default)]
228 commit_message: Option<String>,
229 #[serde(default)]
230 merge_method: Option<String>,
231 #[serde(default)]
232 sha: Option<String>,
233 #[serde(default)]
234 admin_override: bool,
235}
236
237#[derive(Debug, Deserialize)]
238struct ListStalePrsArgs {
239 #[serde(flatten)]
240 config: GitHubClientConfigArgs,
241 repo: String,
242 days: i64,
243}
244
245#[derive(Debug, Deserialize)]
246struct GetPrDiffArgs {
247 #[serde(flatten)]
248 config: GitHubClientConfigArgs,
249 pr_url: String,
250}
251
252#[derive(Debug, Deserialize)]
253struct CreateIssueArgs {
254 #[serde(flatten)]
255 config: GitHubClientConfigArgs,
256 repo: String,
257 title: String,
258 #[serde(default)]
259 body: Option<String>,
260 #[serde(default)]
261 labels: Option<Vec<String>>,
262}
263
264#[derive(Debug, Deserialize)]
265struct ApiCallArgs {
266 #[serde(flatten)]
267 config: GitHubClientConfigArgs,
268 path: String,
269 method: String,
270 #[serde(default)]
271 body: Option<JsonValue>,
272 #[serde(default)]
273 accept: Option<String>,
274}
275
276#[derive(Debug, Deserialize)]
277struct InstallationTokenResponse {
278 token: String,
279 #[serde(default)]
280 expires_at: Option<String>,
281}
282
283#[derive(Debug, Serialize)]
284struct GitHubJwtClaims {
285 iat: i64,
286 exp: i64,
287 iss: String,
288}
289
290#[derive(Clone, Debug)]
291struct RepoRef {
292 owner: String,
293 repo: String,
294}
295
296#[derive(Clone, Debug)]
297struct IssueRef {
298 repo: RepoRef,
299 number: u64,
300}
301
302impl GitHubConnector {
303 pub fn new() -> Self {
304 let state = Arc::new(GitHubConnectorState::default());
305 let client = Arc::new(GitHubClient {
306 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
307 state: state.clone(),
308 http: crate::connectors::outbound_http_client("harn-github-connector"),
309 tokens: GitHubInstallationTokenStore::new(32),
310 });
311 Self {
312 provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
313 kinds: vec![TriggerKind::from("webhook")],
314 state,
315 client,
316 }
317 }
318
319 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
320 let bindings = self
321 .state
322 .bindings
323 .read()
324 .expect("github connector bindings poisoned");
325 if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
326 return bindings.get(binding_id).cloned().ok_or_else(|| {
327 ConnectorError::Unsupported(format!(
328 "github connector has no active binding `{binding_id}`"
329 ))
330 });
331 }
332 if bindings.len() == 1 {
333 return bindings
334 .values()
335 .next()
336 .cloned()
337 .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
338 }
339 Err(ConnectorError::Unsupported(
340 "github connector requires raw.metadata.binding_id when multiple bindings are active"
341 .to_string(),
342 ))
343 }
344
345 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
346 self.state
347 .ctx
348 .read()
349 .expect("github connector ctx poisoned")
350 .clone()
351 .ok_or_else(|| {
352 ConnectorError::Activation(
353 "github connector must be initialized before use".to_string(),
354 )
355 })
356 }
357}
358
359impl Default for GitHubConnector {
360 fn default() -> Self {
361 Self::new()
362 }
363}
364
365#[async_trait]
366impl Connector for GitHubConnector {
367 fn provider_id(&self) -> &ProviderId {
368 &self.provider_id
369 }
370
371 fn kinds(&self) -> &[TriggerKind] {
372 &self.kinds
373 }
374
375 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
376 *self
377 .state
378 .ctx
379 .write()
380 .expect("github connector ctx poisoned") = Some(ctx);
381 Ok(())
382 }
383
384 async fn activate(
385 &self,
386 bindings: &[TriggerBinding],
387 ) -> Result<ActivationHandle, ConnectorError> {
388 let mut configured = HashMap::new();
389 let mut paths = BTreeSet::new();
390 for binding in bindings {
391 let activated = ActivatedGitHubBinding::from_binding(binding)?;
392 if let Some(path) = &activated.path {
393 if !paths.insert(path.clone()) {
394 return Err(ConnectorError::Activation(format!(
395 "github connector path `{path}` is configured by multiple bindings"
396 )));
397 }
398 }
399 configured.insert(binding.binding_id.clone(), activated);
400 }
401 *self
402 .state
403 .bindings
404 .write()
405 .expect("github connector bindings poisoned") = configured;
406 Ok(ActivationHandle::new(
407 self.provider_id.clone(),
408 bindings.len(),
409 ))
410 }
411
412 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
413 let ctx = self.ctx()?;
414 let binding = self.binding_for_raw(&raw)?;
415 let provider = self.provider_id.clone();
416 let received_at = raw.received_at;
417 let headers = effective_headers(&raw.headers);
418 let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
419 futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
420 ctx.event_log.as_ref(),
421 &provider,
422 HmacSignatureStyle::github(),
423 &raw.body,
424 &headers,
425 secret.as_str(),
426 None,
427 received_at,
428 ))?;
429
430 let payload = raw.json_body()?;
431 let event_kind = header_value(&headers, "x-github-event")
432 .map(ToString::to_string)
433 .or_else(|| {
434 if raw.kind.trim().is_empty() {
435 None
436 } else {
437 Some(raw.kind.clone())
438 }
439 })
440 .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
441 let _typed = parse_typed_event(&event_kind, &payload)?;
442 let dedupe_key = header_value(&headers, "x-github-delivery")
443 .map(ToString::to_string)
444 .unwrap_or_else(|| fallback_body_digest(&raw.body));
445 if binding.dedupe_enabled {
446 let inserted = ctx
447 .inbox
448 .insert_if_new(&binding.binding_id, &dedupe_key, binding.dedupe_ttl)
449 .await?;
450 if !inserted {
451 return Err(ConnectorError::DuplicateDelivery(format!(
452 "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
453 binding.binding_id
454 )));
455 }
456 }
457
458 let provider_payload =
459 ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
460 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
461 Ok(TriggerEvent {
462 id: TriggerEventId::new(),
463 provider,
464 kind: event_kind,
465 received_at,
466 occurred_at: raw.occurred_at,
467 dedupe_key,
468 trace_id: TraceId::new(),
469 tenant_id: raw.tenant_id.clone(),
470 headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
471 batch: None,
472 raw_body: Some(raw.body.clone()),
473 provider_payload,
474 signature_status: SignatureStatus::Verified,
475 dedupe_claimed: false,
476 })
477 }
478
479 fn payload_schema(&self) -> ProviderPayloadSchema {
480 ProviderPayloadSchema::named("GitHubEventPayload")
481 }
482
483 fn client(&self) -> Arc<dyn ConnectorClient> {
484 self.client.clone()
485 }
486}
487
488#[async_trait]
489impl ConnectorClient for GitHubClient {
490 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
491 match method {
492 "comment" => {
493 let args: CommentArgs = parse_args(args)?;
494 let config = self.resolve_client_config(&args.config)?;
495 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
496 self.request_json(
497 &config,
498 Method::POST,
499 &format!(
500 "/repos/{}/{}/issues/{}/comments",
501 issue.repo.owner, issue.repo.repo, issue.number
502 ),
503 Some(json!({ "body": args.body })),
504 "application/vnd.github+json",
505 )
506 .await
507 }
508 "add_labels" => {
509 let args: AddLabelsArgs = parse_args(args)?;
510 let config = self.resolve_client_config(&args.config)?;
511 let issue = parse_issue_like_url(&args.issue_url, "issue")?;
512 self.request_json(
513 &config,
514 Method::POST,
515 &format!(
516 "/repos/{}/{}/issues/{}/labels",
517 issue.repo.owner, issue.repo.repo, issue.number
518 ),
519 Some(json!({ "labels": args.labels })),
520 "application/vnd.github+json",
521 )
522 .await
523 }
524 "request_review" => {
525 let args: RequestReviewArgs = parse_args(args)?;
526 let config = self.resolve_client_config(&args.config)?;
527 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
528 self.request_json(
529 &config,
530 Method::POST,
531 &format!(
532 "/repos/{}/{}/pulls/{}/requested_reviewers",
533 pr.repo.owner, pr.repo.repo, pr.number
534 ),
535 Some(json!({ "reviewers": args.reviewers })),
536 "application/vnd.github+json",
537 )
538 .await
539 }
540 "merge_pr" => {
541 let args: MergePrArgs = parse_args(args)?;
542 let config = self.resolve_client_config(&args.config)?;
543 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
544 let mut body = JsonMap::new();
545 if let Some(value) = args.commit_title {
546 body.insert("commit_title".to_string(), JsonValue::String(value));
547 }
548 if let Some(value) = args.commit_message {
549 body.insert("commit_message".to_string(), JsonValue::String(value));
550 }
551 if let Some(value) = args.merge_method {
552 body.insert("merge_method".to_string(), JsonValue::String(value));
553 }
554 if let Some(value) = args.sha {
555 body.insert("sha".to_string(), JsonValue::String(value));
556 }
557 let mut response = self
558 .request_json(
559 &config,
560 Method::PUT,
561 &format!(
562 "/repos/{}/{}/pulls/{}/merge",
563 pr.repo.owner, pr.repo.repo, pr.number
564 ),
565 Some(JsonValue::Object(body)),
566 "application/vnd.github+json",
567 )
568 .await?;
569 if args.admin_override {
570 if let Some(map) = response.as_object_mut() {
571 map.insert(
572 "admin_override_requested".to_string(),
573 JsonValue::Bool(true),
574 );
575 }
576 }
577 Ok(response)
578 }
579 "list_stale_prs" => {
580 let args: ListStalePrsArgs = parse_args(args)?;
581 let config = self.resolve_client_config(&args.config)?;
582 let repo = parse_repo_ref(&args.repo)?;
583 let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
584 .date()
585 .to_string();
586 let query = format!(
587 "repo:{}/{} is:pr is:open updated:<{}",
588 repo.owner, repo.repo, stale_before
589 );
590 let url = Url::parse_with_params(
591 &absolute_api_url(&config.api_base_url, "/search/issues")?,
592 &[("q", query)],
593 )
594 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
595 let response = self
596 .request_response(
597 &config,
598 Method::GET,
599 url.to_string(),
600 None,
601 "application/vnd.github+json",
602 )
603 .await?;
604 response
605 .json::<JsonValue>()
606 .await
607 .map_err(|error| ClientError::Transport(error.to_string()))
608 }
609 "get_pr_diff" => {
610 let args: GetPrDiffArgs = parse_args(args)?;
611 let config = self.resolve_client_config(&args.config)?;
612 let pr = parse_issue_like_url(&args.pr_url, "pull")?;
613 let text = self
614 .request_text(
615 &config,
616 Method::GET,
617 &format!(
618 "/repos/{}/{}/pulls/{}",
619 pr.repo.owner, pr.repo.repo, pr.number
620 ),
621 "application/vnd.github.diff",
622 )
623 .await?;
624 Ok(JsonValue::String(text))
625 }
626 "create_issue" => {
627 let args: CreateIssueArgs = parse_args(args)?;
628 let config = self.resolve_client_config(&args.config)?;
629 let repo = parse_repo_ref(&args.repo)?;
630 let mut body = JsonMap::new();
631 body.insert("title".to_string(), JsonValue::String(args.title));
632 if let Some(value) = args.body {
633 body.insert("body".to_string(), JsonValue::String(value));
634 }
635 if let Some(labels) = args.labels {
636 body.insert("labels".to_string(), json!(labels));
637 }
638 self.request_json(
639 &config,
640 Method::POST,
641 &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
642 Some(JsonValue::Object(body)),
643 "application/vnd.github+json",
644 )
645 .await
646 }
647 "api_call" => {
648 let args: ApiCallArgs = parse_args(args)?;
649 let config = self.resolve_client_config(&args.config)?;
650 let method = Method::from_bytes(args.method.as_bytes())
651 .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
652 self.request_json(
653 &config,
654 method,
655 &args.path,
656 args.body,
657 args.accept
658 .as_deref()
659 .unwrap_or("application/vnd.github+json"),
660 )
661 .await
662 }
663 other => Err(ClientError::MethodNotFound(format!(
664 "github connector does not implement outbound method `{other}`"
665 ))),
666 }
667 }
668}
669
670impl GitHubClient {
671 fn resolve_client_config(
672 &self,
673 args: &GitHubClientConfigArgs,
674 ) -> Result<ResolvedGitHubClientConfig, ClientError> {
675 let app_id = args.app_id.ok_or_else(|| {
676 ClientError::InvalidArgs("github connector requires app_id".to_string())
677 })?;
678 let installation_id = args.installation_id.ok_or_else(|| {
679 ClientError::InvalidArgs("github connector requires installation_id".to_string())
680 })?;
681 let private_key = if let Some(secret_id) = args
682 .private_key_secret
683 .as_deref()
684 .or(args.secrets.private_key.as_deref())
685 .and_then(|value| parse_secret_id(Some(value)))
686 {
687 PrivateKeySource::Secret(secret_id)
688 } else if let Some(pem) = args.private_key_pem.clone() {
689 PrivateKeySource::Inline(pem)
690 } else {
691 return Err(ClientError::InvalidArgs(
692 "github connector requires private_key_secret or private_key_pem".to_string(),
693 ));
694 };
695 Ok(ResolvedGitHubClientConfig {
696 app_id,
697 installation_id,
698 api_base_url: args
699 .api_base_url
700 .clone()
701 .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
702 private_key,
703 })
704 }
705
706 fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
707 self.state
708 .ctx
709 .read()
710 .expect("github connector ctx poisoned")
711 .clone()
712 .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
713 }
714
715 async fn request_json(
716 &self,
717 config: &ResolvedGitHubClientConfig,
718 method: Method,
719 path: &str,
720 body: Option<JsonValue>,
721 accept: &str,
722 ) -> Result<JsonValue, ClientError> {
723 let url = absolute_api_url(&config.api_base_url, path)?;
724 let response = self
725 .request_response(config, method, url, body, accept)
726 .await?;
727 response
728 .json::<JsonValue>()
729 .await
730 .map_err(|error| ClientError::Transport(error.to_string()))
731 }
732
733 async fn request_text(
734 &self,
735 config: &ResolvedGitHubClientConfig,
736 method: Method,
737 path: &str,
738 accept: &str,
739 ) -> Result<String, ClientError> {
740 let url = absolute_api_url(&config.api_base_url, path)?;
741 let response = self
742 .request_response(config, method, url, None, accept)
743 .await?;
744 response
745 .text()
746 .await
747 .map_err(|error| ClientError::Transport(error.to_string()))
748 }
749
750 async fn request_response(
751 &self,
752 config: &ResolvedGitHubClientConfig,
753 method: Method,
754 url: String,
755 body: Option<JsonValue>,
756 accept: &str,
757 ) -> Result<Response, ClientError> {
758 let mut retried_401 = false;
759 let mut retried_rate_limit = false;
760 loop {
761 let token = self.installation_token(config).await?;
762 let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
763 let ctx = self.ctx()?;
764 ctx.rate_limiter
765 .scoped(
766 &self.provider_id,
767 format!("installation:{}", config.installation_id),
768 )
769 .acquire()
770 .await;
771
772 let mut request = self
773 .http
774 .request(method.clone(), &url)
775 .header("Accept", accept)
776 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
777 .header("Authorization", format!("Bearer {token_text}"));
778 if let Some(payload) = body.clone() {
779 request = request.json(&payload);
780 }
781 let response = request
782 .send()
783 .await
784 .map_err(|error| ClientError::Transport(error.to_string()))?;
785 self.record_rate_limit_observation(&response).await;
786
787 if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
788 self.tokens.invalidate(config.installation_id);
789 retried_401 = true;
790 continue;
791 }
792 if is_rate_limited(&response) && !retried_rate_limit {
793 tokio::time::sleep(rate_limit_backoff(&response)).await;
794 retried_rate_limit = true;
795 continue;
796 }
797 if !response.status().is_success() {
798 let status = response.status();
799 let body = response.text().await.unwrap_or_default();
800 let message = if body.trim().is_empty() {
801 format!("github API request failed with status {status}")
802 } else {
803 format!("github API request failed with status {status}: {body}")
804 };
805 return Err(if is_rate_limited_status(status) {
806 ClientError::RateLimited(message)
807 } else {
808 ClientError::Transport(message)
809 });
810 }
811 return Ok(response);
812 }
813 }
814
815 async fn installation_token(
816 &self,
817 config: &ResolvedGitHubClientConfig,
818 ) -> Result<SecretBytes, ClientError> {
819 let now = OffsetDateTime::now_utc();
820 if let Some(token) = self.tokens.get(config.installation_id, now) {
821 return Ok(token);
822 }
823 let jwt = self.mint_app_jwt(config).await?;
824 let url = absolute_api_url(
825 &config.api_base_url,
826 &format!(
827 "/app/installations/{}/access_tokens",
828 config.installation_id
829 ),
830 )?;
831 let ctx = self.ctx()?;
832 ctx.rate_limiter
833 .scoped(
834 &self.provider_id,
835 format!("installation:{}", config.installation_id),
836 )
837 .acquire()
838 .await;
839 let response = self
840 .http
841 .post(url)
842 .header("Accept", "application/vnd.github+json")
843 .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
844 .header("Authorization", format!("Bearer {jwt}"))
845 .send()
846 .await
847 .map_err(|error| ClientError::Transport(error.to_string()))?;
848 self.record_rate_limit_observation(&response).await;
849 if !response.status().is_success() {
850 let status = response.status();
851 let body = response.text().await.unwrap_or_default();
852 return Err(ClientError::Transport(format!(
853 "failed to create GitHub installation token ({status}): {body}"
854 )));
855 }
856 let payload: InstallationTokenResponse = response
857 .json()
858 .await
859 .map_err(|error| ClientError::Transport(error.to_string()))?;
860 let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
861 let token = SecretBytes::from(payload.token);
862 let result = token.reborrow();
863 self.tokens.store(config.installation_id, token, refresh_at);
864 Ok(result)
865 }
866
867 async fn mint_app_jwt(
868 &self,
869 config: &ResolvedGitHubClientConfig,
870 ) -> Result<String, ClientError> {
871 let pem = self.private_key_pem(config).await?;
872 let now = OffsetDateTime::now_utc();
873 let claims = GitHubJwtClaims {
874 iat: (now - Duration::seconds(60)).unix_timestamp(),
875 exp: (now + Duration::minutes(9)).unix_timestamp(),
876 iss: config.app_id.to_string(),
877 };
878 let header = Header::new(Algorithm::RS256);
879 let key = EncodingKey::from_rsa_pem(pem.as_bytes())
880 .map_err(|error| ClientError::Other(error.to_string()))?;
881 jsonwebtoken::encode(&header, &claims, &key)
882 .map_err(|error| ClientError::Other(error.to_string()))
883 }
884
885 async fn private_key_pem(
886 &self,
887 config: &ResolvedGitHubClientConfig,
888 ) -> Result<String, ClientError> {
889 match &config.private_key {
890 PrivateKeySource::Inline(pem) => Ok(pem.clone()),
891 PrivateKeySource::Secret(id) => {
892 let ctx = self.ctx()?;
893 let secret = ctx
894 .secrets
895 .get(id)
896 .await
897 .map_err(|error| ClientError::Other(error.to_string()))?;
898 secret.with_exposed(|bytes| {
899 std::str::from_utf8(bytes)
900 .map(|value| value.to_string())
901 .map_err(|error| ClientError::Other(error.to_string()))
902 })
903 }
904 }
905 }
906
907 async fn record_rate_limit_observation(&self, response: &Response) {
908 let remaining = response
909 .headers()
910 .get("x-ratelimit-remaining")
911 .and_then(|value| value.to_str().ok())
912 .and_then(|value| value.parse::<u64>().ok());
913 let reset = response
914 .headers()
915 .get("x-ratelimit-reset")
916 .and_then(|value| value.to_str().ok())
917 .and_then(|value| value.parse::<i64>().ok());
918 let Some(remaining) = remaining else {
919 return;
920 };
921 if remaining > 10 {
922 return;
923 }
924 let Ok(ctx) = self.ctx() else {
925 return;
926 };
927 let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
928 return;
929 };
930 let _ = ctx
931 .event_log
932 .append(
933 &topic,
934 LogEvent::new(
935 "github.rate_limit",
936 json!({
937 "remaining": remaining,
938 "reset": reset,
939 "status": response.status().as_u16(),
940 }),
941 ),
942 )
943 .await;
944 }
945}
946
947impl ActivatedGitHubBinding {
948 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
949 let config: GitHubBindingConfig =
950 serde_json::from_value(binding.config.clone()).map_err(|error| {
951 ConnectorError::Activation(format!(
952 "github binding `{}` has invalid config: {error}",
953 binding.binding_id
954 ))
955 })?;
956 let signing_secret =
957 parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
958 ConnectorError::Activation(format!(
959 "github binding `{}` requires secrets.signing_secret",
960 binding.binding_id
961 ))
962 })?;
963 Ok(Self {
964 binding_id: binding.binding_id.clone(),
965 path: config.match_config.path,
966 signing_secret,
967 dedupe_enabled: binding.dedupe_key.is_some(),
968 dedupe_ttl: std::time::Duration::from_secs(
969 u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
970 ),
971 })
972 }
973}
974
975impl GitHubInstallationTokenStore {
976 fn new(capacity: usize) -> Self {
977 Self {
978 capacity: capacity.max(1),
979 state: Mutex::new(TokenCacheState::default()),
980 }
981 }
982
983 fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
984 let mut state = self.state.lock().expect("github token cache poisoned");
985 let refresh_at = state
986 .entries
987 .get(&installation_id)
988 .map(|entry| entry.refresh_at)?;
989 if refresh_at <= now {
990 state.entries.remove(&installation_id);
991 state.order.retain(|id| *id != installation_id);
992 return None;
993 }
994 touch_lru(&mut state.order, installation_id);
995 state
996 .entries
997 .get(&installation_id)
998 .map(|entry| entry.token.reborrow())
999 }
1000
1001 fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
1002 let mut state = self.state.lock().expect("github token cache poisoned");
1003 state.entries.insert(
1004 installation_id,
1005 InstallationTokenEntry { token, refresh_at },
1006 );
1007 touch_lru(&mut state.order, installation_id);
1008 while state.entries.len() > self.capacity {
1009 if let Some(evicted) = state.order.pop_front() {
1010 state.entries.remove(&evicted);
1011 }
1012 }
1013 }
1014
1015 fn invalidate(&self, installation_id: u64) {
1016 let mut state = self.state.lock().expect("github token cache poisoned");
1017 state.entries.remove(&installation_id);
1018 state.order.retain(|id| *id != installation_id);
1019 }
1020}
1021
1022fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
1023 order.retain(|id| *id != installation_id);
1024 order.push_back(installation_id);
1025}
1026
1027fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
1028 serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
1029}
1030
1031fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
1032 match kind {
1033 "issues" => serde_json::from_value(payload.clone())
1034 .map(ParsedGitHubEvent::Issues)
1035 .map_err(|error| ConnectorError::Json(error.to_string())),
1036 "pull_request" => serde_json::from_value(payload.clone())
1037 .map(ParsedGitHubEvent::PullRequest)
1038 .map_err(|error| ConnectorError::Json(error.to_string())),
1039 "issue_comment" => serde_json::from_value(payload.clone())
1040 .map(ParsedGitHubEvent::IssueComment)
1041 .map_err(|error| ConnectorError::Json(error.to_string())),
1042 "pull_request_review" => serde_json::from_value(payload.clone())
1043 .map(ParsedGitHubEvent::PullRequestReview)
1044 .map_err(|error| ConnectorError::Json(error.to_string())),
1045 "push" => serde_json::from_value(payload.clone())
1046 .map(ParsedGitHubEvent::Push)
1047 .map_err(|error| ConnectorError::Json(error.to_string())),
1048 "workflow_run" => serde_json::from_value(payload.clone())
1049 .map(ParsedGitHubEvent::WorkflowRun)
1050 .map_err(|error| ConnectorError::Json(error.to_string())),
1051 "deployment_status" => serde_json::from_value(payload.clone())
1052 .map(ParsedGitHubEvent::DeploymentStatus)
1053 .map_err(|error| ConnectorError::Json(error.to_string())),
1054 "check_run" => serde_json::from_value(payload.clone())
1055 .map(ParsedGitHubEvent::CheckRun)
1056 .map_err(|error| ConnectorError::Json(error.to_string())),
1057 other => Ok(ParsedGitHubEvent::Other {
1058 kind: other.to_string(),
1059 raw: payload.clone(),
1060 }),
1061 }
1062}
1063
1064fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1065 let mut effective = headers.clone();
1066 canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1067 canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1068 canonicalize_header(
1069 headers,
1070 &mut effective,
1071 "x-github-delivery",
1072 "X-GitHub-Delivery",
1073 );
1074 canonicalize_header(
1075 headers,
1076 &mut effective,
1077 "x-github-hook-id",
1078 "X-GitHub-Hook-Id",
1079 );
1080 canonicalize_header(
1081 headers,
1082 &mut effective,
1083 "x-hub-signature-256",
1084 "X-Hub-Signature-256",
1085 );
1086 effective
1087}
1088
1089fn canonicalize_header(
1090 source: &BTreeMap<String, String>,
1091 target: &mut BTreeMap<String, String>,
1092 lookup_name: &str,
1093 canonical_name: &str,
1094) {
1095 if let Some(value) = header_value(source, lookup_name) {
1096 target
1097 .entry(canonical_name.to_string())
1098 .or_insert_with(|| value.to_string());
1099 }
1100}
1101
1102fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1103 headers
1104 .iter()
1105 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1106 .map(|(_, value)| value.as_str())
1107}
1108
1109fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1110 let trimmed = raw?.trim();
1111 if trimmed.is_empty() {
1112 return None;
1113 }
1114 let (base, version) = match trimmed.rsplit_once('@') {
1115 Some((base, version_text)) => {
1116 let version = version_text.parse::<u64>().ok()?;
1117 (base, SecretVersion::Exact(version))
1118 }
1119 None => (trimmed, SecretVersion::Latest),
1120 };
1121 let (namespace, name) = base.split_once('/')?;
1122 if namespace.is_empty() || name.is_empty() {
1123 return None;
1124 }
1125 Some(SecretId::new(namespace, name).with_version(version))
1126}
1127
1128fn load_secret_text_blocking(
1129 ctx: &ConnectorCtx,
1130 secret_id: &SecretId,
1131) -> Result<String, ConnectorError> {
1132 let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1133 secret.with_exposed(|bytes| {
1134 std::str::from_utf8(bytes)
1135 .map(|value| value.to_string())
1136 .map_err(|error| {
1137 ConnectorError::Secret(format!(
1138 "github secret `{secret_id}` is not valid UTF-8: {error}"
1139 ))
1140 })
1141 })
1142}
1143
1144fn fallback_body_digest(body: &[u8]) -> String {
1145 let digest = Sha256::digest(body);
1146 format!("sha256:{}", hex::encode(digest))
1147}
1148
1149fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1150 let trimmed = input.trim().trim_matches('/');
1151 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1152 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1153 if parts.len() >= 2 {
1154 return Ok(RepoRef {
1155 owner: parts[0].to_string(),
1156 repo: parts[1].to_string(),
1157 });
1158 }
1159 }
1160 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1161 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1162 if parts.len() >= 2 {
1163 return Ok(RepoRef {
1164 owner: parts[0].to_string(),
1165 repo: parts[1].to_string(),
1166 });
1167 }
1168 }
1169 if let Some((owner, repo)) = trimmed.split_once('/') {
1170 if !owner.is_empty() && !repo.is_empty() {
1171 return Ok(RepoRef {
1172 owner: owner.to_string(),
1173 repo: repo.to_string(),
1174 });
1175 }
1176 }
1177 Err(ClientError::InvalidArgs(format!(
1178 "invalid GitHub repository reference `{input}`"
1179 )))
1180}
1181
1182fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1183 let trimmed = input.trim().trim_matches('/');
1184 if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1185 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1186 if parts.len() >= 4 {
1187 let kind = parts[2];
1188 if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1189 return Ok(IssueRef {
1190 repo: RepoRef {
1191 owner: parts[0].to_string(),
1192 repo: parts[1].to_string(),
1193 },
1194 number: parts[3].parse().map_err(|error| {
1195 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1196 })?,
1197 });
1198 }
1199 }
1200 }
1201 if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1202 let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1203 if parts.len() >= 4 {
1204 let kind = parts[2];
1205 if kind == expected_kind
1206 || (expected_kind == "issue" && kind == "issues")
1207 || (expected_kind == "pull" && kind == "pulls")
1208 {
1209 return Ok(IssueRef {
1210 repo: RepoRef {
1211 owner: parts[0].to_string(),
1212 repo: parts[1].to_string(),
1213 },
1214 number: parts[3].parse().map_err(|error| {
1215 ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1216 })?,
1217 });
1218 }
1219 }
1220 }
1221 Err(ClientError::InvalidArgs(format!(
1222 "invalid GitHub {expected_kind} URL `{input}`"
1223 )))
1224}
1225
1226fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1227 let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1228 base.join(path.trim_start_matches('/'))
1229 .map(|value| value.to_string())
1230 .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1231}
1232
1233fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1234 let eager_refresh = now + Duration::minutes(55);
1235 let Some(expires_at) = expires_at else {
1236 return eager_refresh;
1237 };
1238 let parsed =
1239 OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1240 parsed
1241 .map(|value| value - Duration::minutes(5))
1242 .map(|value| {
1243 if value < eager_refresh {
1244 value
1245 } else {
1246 eager_refresh
1247 }
1248 })
1249 .unwrap_or(eager_refresh)
1250}
1251
1252fn is_rate_limited(response: &Response) -> bool {
1253 if is_rate_limited_status(response.status()) {
1254 return true;
1255 }
1256 response
1257 .headers()
1258 .get("x-ratelimit-remaining")
1259 .and_then(|value| value.to_str().ok())
1260 .map(|value| value == "0")
1261 .unwrap_or(false)
1262}
1263
1264fn is_rate_limited_status(status: StatusCode) -> bool {
1265 status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1266}
1267
1268fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1269 if let Some(delay) = response
1270 .headers()
1271 .get("retry-after")
1272 .and_then(|value| value.to_str().ok())
1273 .and_then(|value| value.parse::<u64>().ok())
1274 {
1275 return std::time::Duration::from_secs(delay);
1276 }
1277 if let Some(reset_at) = response
1278 .headers()
1279 .get("x-ratelimit-reset")
1280 .and_then(|value| value.to_str().ok())
1281 .and_then(|value| value.parse::<i64>().ok())
1282 .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1283 {
1284 let now = OffsetDateTime::now_utc();
1285 if reset_at > now {
1286 let delta = reset_at - now;
1287 return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1288 }
1289 }
1290 std::time::Duration::from_millis(100)
1291}