Skip to main content

harn_vm/connectors/github/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15    HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21    TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33    provider_id: ProviderId,
34    kinds: Vec<TriggerKind>,
35    state: Arc<GitHubConnectorState>,
36    client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41    ctx: RwLock<Option<ConnectorCtx>>,
42    bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47    binding_id: String,
48    path: Option<String>,
49    signing_secret: SecretId,
50    dedupe_enabled: bool,
51    dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55    provider_id: ProviderId,
56    state: Arc<GitHubConnectorState>,
57    http: reqwest::Client,
58    tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62    capacity: usize,
63    state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68    entries: HashMap<u64, InstallationTokenEntry>,
69    order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73    token: SecretBytes,
74    refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79    app_id: u64,
80    installation_id: u64,
81    api_base_url: String,
82    private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87    Inline(String),
88    Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94    Issues(IssuesEvent),
95    PullRequest(PullRequestEvent),
96    IssueComment(IssueCommentEvent),
97    PullRequestReview(PullRequestReviewEvent),
98    Push(PushEvent),
99    WorkflowRun(WorkflowRunEvent),
100    Other { kind: String, raw: JsonValue },
101}
102
103#[allow(dead_code)]
104#[derive(Debug, Deserialize)]
105struct IssuesEvent {
106    action: Option<String>,
107    issue: JsonValue,
108}
109
110#[allow(dead_code)]
111#[derive(Debug, Deserialize)]
112struct PullRequestEvent {
113    action: Option<String>,
114    pull_request: JsonValue,
115}
116
117#[allow(dead_code)]
118#[derive(Debug, Deserialize)]
119struct IssueCommentEvent {
120    action: Option<String>,
121    comment: JsonValue,
122    issue: JsonValue,
123}
124
125#[allow(dead_code)]
126#[derive(Debug, Deserialize)]
127struct PullRequestReviewEvent {
128    action: Option<String>,
129    review: JsonValue,
130    pull_request: JsonValue,
131}
132
133#[allow(dead_code)]
134#[derive(Debug, Deserialize)]
135struct PushEvent {
136    #[serde(default)]
137    commits: Vec<JsonValue>,
138    #[serde(default)]
139    distinct_size: Option<u64>,
140}
141
142#[allow(dead_code)]
143#[derive(Debug, Deserialize)]
144struct WorkflowRunEvent {
145    action: Option<String>,
146    workflow_run: JsonValue,
147}
148
149#[derive(Debug, Default, Deserialize)]
150struct GitHubBindingConfig {
151    #[serde(default, rename = "match")]
152    match_config: GitHubMatchConfig,
153    #[serde(default)]
154    secrets: GitHubSecretsConfig,
155}
156
157#[derive(Debug, Default, Deserialize)]
158struct GitHubMatchConfig {
159    path: Option<String>,
160}
161
162#[derive(Debug, Default, Deserialize)]
163struct GitHubSecretsConfig {
164    signing_secret: Option<String>,
165    private_key: Option<String>,
166}
167
168#[derive(Debug, Default, Deserialize)]
169struct GitHubClientConfigArgs {
170    app_id: Option<u64>,
171    installation_id: Option<u64>,
172    api_base_url: Option<String>,
173    private_key_pem: Option<String>,
174    private_key_secret: Option<String>,
175    #[serde(default)]
176    secrets: GitHubSecretsConfig,
177}
178
179#[derive(Debug, Deserialize)]
180struct CommentArgs {
181    #[serde(flatten)]
182    config: GitHubClientConfigArgs,
183    issue_url: String,
184    body: String,
185}
186
187#[derive(Debug, Deserialize)]
188struct AddLabelsArgs {
189    #[serde(flatten)]
190    config: GitHubClientConfigArgs,
191    issue_url: String,
192    labels: Vec<String>,
193}
194
195#[derive(Debug, Deserialize)]
196struct RequestReviewArgs {
197    #[serde(flatten)]
198    config: GitHubClientConfigArgs,
199    pr_url: String,
200    reviewers: Vec<String>,
201}
202
203#[derive(Debug, Deserialize)]
204struct MergePrArgs {
205    #[serde(flatten)]
206    config: GitHubClientConfigArgs,
207    pr_url: String,
208    #[serde(default)]
209    commit_title: Option<String>,
210    #[serde(default)]
211    commit_message: Option<String>,
212    #[serde(default)]
213    merge_method: Option<String>,
214    #[serde(default)]
215    sha: Option<String>,
216    #[serde(default)]
217    admin_override: bool,
218}
219
220#[derive(Debug, Deserialize)]
221struct ListStalePrsArgs {
222    #[serde(flatten)]
223    config: GitHubClientConfigArgs,
224    repo: String,
225    days: i64,
226}
227
228#[derive(Debug, Deserialize)]
229struct GetPrDiffArgs {
230    #[serde(flatten)]
231    config: GitHubClientConfigArgs,
232    pr_url: String,
233}
234
235#[derive(Debug, Deserialize)]
236struct CreateIssueArgs {
237    #[serde(flatten)]
238    config: GitHubClientConfigArgs,
239    repo: String,
240    title: String,
241    #[serde(default)]
242    body: Option<String>,
243    #[serde(default)]
244    labels: Option<Vec<String>>,
245}
246
247#[derive(Debug, Deserialize)]
248struct InstallationTokenResponse {
249    token: String,
250    #[serde(default)]
251    expires_at: Option<String>,
252}
253
254#[derive(Debug, Serialize)]
255struct GitHubJwtClaims {
256    iat: i64,
257    exp: i64,
258    iss: String,
259}
260
261#[derive(Clone, Debug)]
262struct RepoRef {
263    owner: String,
264    repo: String,
265}
266
267#[derive(Clone, Debug)]
268struct IssueRef {
269    repo: RepoRef,
270    number: u64,
271}
272
273impl GitHubConnector {
274    pub fn new() -> Self {
275        let state = Arc::new(GitHubConnectorState::default());
276        let client = Arc::new(GitHubClient {
277            provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
278            state: state.clone(),
279            http: reqwest::Client::builder()
280                .user_agent("harn-github-connector")
281                .build()
282                .unwrap_or_else(|_| reqwest::Client::new()),
283            tokens: GitHubInstallationTokenStore::new(32),
284        });
285        Self {
286            provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
287            kinds: vec![TriggerKind::from("webhook")],
288            state,
289            client,
290        }
291    }
292
293    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
294        let bindings = self
295            .state
296            .bindings
297            .read()
298            .expect("github connector bindings poisoned");
299        if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
300            return bindings.get(binding_id).cloned().ok_or_else(|| {
301                ConnectorError::Unsupported(format!(
302                    "github connector has no active binding `{binding_id}`"
303                ))
304            });
305        }
306        if bindings.len() == 1 {
307            return bindings
308                .values()
309                .next()
310                .cloned()
311                .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
312        }
313        Err(ConnectorError::Unsupported(
314            "github connector requires raw.metadata.binding_id when multiple bindings are active"
315                .to_string(),
316        ))
317    }
318
319    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
320        self.state
321            .ctx
322            .read()
323            .expect("github connector ctx poisoned")
324            .clone()
325            .ok_or_else(|| {
326                ConnectorError::Activation(
327                    "github connector must be initialized before use".to_string(),
328                )
329            })
330    }
331}
332
333impl Default for GitHubConnector {
334    fn default() -> Self {
335        Self::new()
336    }
337}
338
339#[async_trait]
340impl Connector for GitHubConnector {
341    fn provider_id(&self) -> &ProviderId {
342        &self.provider_id
343    }
344
345    fn kinds(&self) -> &[TriggerKind] {
346        &self.kinds
347    }
348
349    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
350        *self
351            .state
352            .ctx
353            .write()
354            .expect("github connector ctx poisoned") = Some(ctx);
355        Ok(())
356    }
357
358    async fn activate(
359        &self,
360        bindings: &[TriggerBinding],
361    ) -> Result<ActivationHandle, ConnectorError> {
362        let mut configured = HashMap::new();
363        let mut paths = BTreeSet::new();
364        for binding in bindings {
365            let activated = ActivatedGitHubBinding::from_binding(binding)?;
366            if let Some(path) = &activated.path {
367                if !paths.insert(path.clone()) {
368                    return Err(ConnectorError::Activation(format!(
369                        "github connector path `{path}` is configured by multiple bindings"
370                    )));
371                }
372            }
373            configured.insert(binding.binding_id.clone(), activated);
374        }
375        *self
376            .state
377            .bindings
378            .write()
379            .expect("github connector bindings poisoned") = configured;
380        Ok(ActivationHandle::new(
381            self.provider_id.clone(),
382            bindings.len(),
383        ))
384    }
385
386    fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
387        let ctx = self.ctx()?;
388        let binding = self.binding_for_raw(&raw)?;
389        let provider = self.provider_id.clone();
390        let received_at = raw.received_at;
391        let headers = effective_headers(&raw.headers);
392        let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
393        futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
394            ctx.event_log.as_ref(),
395            &provider,
396            HmacSignatureStyle::github(),
397            &raw.body,
398            &headers,
399            secret.as_str(),
400            None,
401            received_at,
402        ))?;
403
404        let payload = raw.json_body()?;
405        let event_kind = header_value(&headers, "x-github-event")
406            .map(ToString::to_string)
407            .or_else(|| {
408                if raw.kind.trim().is_empty() {
409                    None
410                } else {
411                    Some(raw.kind.clone())
412                }
413            })
414            .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
415        let _typed = parse_typed_event(&event_kind, &payload)?;
416        let dedupe_key = header_value(&headers, "x-github-delivery")
417            .map(ToString::to_string)
418            .unwrap_or_else(|| fallback_body_digest(&raw.body));
419        if binding.dedupe_enabled {
420            let inserted = futures::executor::block_on(ctx.inbox.insert_if_new(
421                &binding.binding_id,
422                &dedupe_key,
423                binding.dedupe_ttl,
424            ))?;
425            if !inserted {
426                return Err(ConnectorError::DuplicateDelivery(format!(
427                    "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
428                    binding.binding_id
429                )));
430            }
431        }
432
433        let provider_payload =
434            ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
435                .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
436        Ok(TriggerEvent {
437            id: TriggerEventId::new(),
438            provider,
439            kind: event_kind,
440            received_at,
441            occurred_at: raw.occurred_at,
442            dedupe_key,
443            trace_id: TraceId::new(),
444            tenant_id: raw.tenant_id.clone(),
445            headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
446            batch: None,
447            provider_payload,
448            signature_status: SignatureStatus::Verified,
449            dedupe_claimed: false,
450        })
451    }
452
453    fn payload_schema(&self) -> ProviderPayloadSchema {
454        ProviderPayloadSchema::named("GitHubEventPayload")
455    }
456
457    fn client(&self) -> Arc<dyn ConnectorClient> {
458        self.client.clone()
459    }
460}
461
462#[async_trait]
463impl ConnectorClient for GitHubClient {
464    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
465        match method {
466            "comment" => {
467                let args: CommentArgs = parse_args(args)?;
468                let config = self.resolve_client_config(&args.config)?;
469                let issue = parse_issue_like_url(&args.issue_url, "issue")?;
470                self.request_json(
471                    &config,
472                    Method::POST,
473                    &format!(
474                        "/repos/{}/{}/issues/{}/comments",
475                        issue.repo.owner, issue.repo.repo, issue.number
476                    ),
477                    Some(json!({ "body": args.body })),
478                    "application/vnd.github+json",
479                )
480                .await
481            }
482            "add_labels" => {
483                let args: AddLabelsArgs = parse_args(args)?;
484                let config = self.resolve_client_config(&args.config)?;
485                let issue = parse_issue_like_url(&args.issue_url, "issue")?;
486                self.request_json(
487                    &config,
488                    Method::POST,
489                    &format!(
490                        "/repos/{}/{}/issues/{}/labels",
491                        issue.repo.owner, issue.repo.repo, issue.number
492                    ),
493                    Some(json!({ "labels": args.labels })),
494                    "application/vnd.github+json",
495                )
496                .await
497            }
498            "request_review" => {
499                let args: RequestReviewArgs = parse_args(args)?;
500                let config = self.resolve_client_config(&args.config)?;
501                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
502                self.request_json(
503                    &config,
504                    Method::POST,
505                    &format!(
506                        "/repos/{}/{}/pulls/{}/requested_reviewers",
507                        pr.repo.owner, pr.repo.repo, pr.number
508                    ),
509                    Some(json!({ "reviewers": args.reviewers })),
510                    "application/vnd.github+json",
511                )
512                .await
513            }
514            "merge_pr" => {
515                let args: MergePrArgs = parse_args(args)?;
516                let config = self.resolve_client_config(&args.config)?;
517                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
518                let mut body = JsonMap::new();
519                if let Some(value) = args.commit_title {
520                    body.insert("commit_title".to_string(), JsonValue::String(value));
521                }
522                if let Some(value) = args.commit_message {
523                    body.insert("commit_message".to_string(), JsonValue::String(value));
524                }
525                if let Some(value) = args.merge_method {
526                    body.insert("merge_method".to_string(), JsonValue::String(value));
527                }
528                if let Some(value) = args.sha {
529                    body.insert("sha".to_string(), JsonValue::String(value));
530                }
531                let mut response = self
532                    .request_json(
533                        &config,
534                        Method::PUT,
535                        &format!(
536                            "/repos/{}/{}/pulls/{}/merge",
537                            pr.repo.owner, pr.repo.repo, pr.number
538                        ),
539                        Some(JsonValue::Object(body)),
540                        "application/vnd.github+json",
541                    )
542                    .await?;
543                if args.admin_override {
544                    if let Some(map) = response.as_object_mut() {
545                        map.insert(
546                            "admin_override_requested".to_string(),
547                            JsonValue::Bool(true),
548                        );
549                    }
550                }
551                Ok(response)
552            }
553            "list_stale_prs" => {
554                let args: ListStalePrsArgs = parse_args(args)?;
555                let config = self.resolve_client_config(&args.config)?;
556                let repo = parse_repo_ref(&args.repo)?;
557                let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
558                    .date()
559                    .to_string();
560                let query = format!(
561                    "repo:{}/{} is:pr is:open updated:<{}",
562                    repo.owner, repo.repo, stale_before
563                );
564                let url = Url::parse_with_params(
565                    &absolute_api_url(&config.api_base_url, "/search/issues")?,
566                    &[("q", query)],
567                )
568                .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
569                let response = self
570                    .request_response(
571                        &config,
572                        Method::GET,
573                        url.to_string(),
574                        None,
575                        "application/vnd.github+json",
576                    )
577                    .await?;
578                response
579                    .json::<JsonValue>()
580                    .await
581                    .map_err(|error| ClientError::Transport(error.to_string()))
582            }
583            "get_pr_diff" => {
584                let args: GetPrDiffArgs = parse_args(args)?;
585                let config = self.resolve_client_config(&args.config)?;
586                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
587                let text = self
588                    .request_text(
589                        &config,
590                        Method::GET,
591                        &format!(
592                            "/repos/{}/{}/pulls/{}",
593                            pr.repo.owner, pr.repo.repo, pr.number
594                        ),
595                        "application/vnd.github.diff",
596                    )
597                    .await?;
598                Ok(JsonValue::String(text))
599            }
600            "create_issue" => {
601                let args: CreateIssueArgs = parse_args(args)?;
602                let config = self.resolve_client_config(&args.config)?;
603                let repo = parse_repo_ref(&args.repo)?;
604                let mut body = JsonMap::new();
605                body.insert("title".to_string(), JsonValue::String(args.title));
606                if let Some(value) = args.body {
607                    body.insert("body".to_string(), JsonValue::String(value));
608                }
609                if let Some(labels) = args.labels {
610                    body.insert("labels".to_string(), json!(labels));
611                }
612                self.request_json(
613                    &config,
614                    Method::POST,
615                    &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
616                    Some(JsonValue::Object(body)),
617                    "application/vnd.github+json",
618                )
619                .await
620            }
621            other => Err(ClientError::MethodNotFound(format!(
622                "github connector does not implement outbound method `{other}`"
623            ))),
624        }
625    }
626}
627
628impl GitHubClient {
629    fn resolve_client_config(
630        &self,
631        args: &GitHubClientConfigArgs,
632    ) -> Result<ResolvedGitHubClientConfig, ClientError> {
633        let app_id = args.app_id.ok_or_else(|| {
634            ClientError::InvalidArgs("github connector requires app_id".to_string())
635        })?;
636        let installation_id = args.installation_id.ok_or_else(|| {
637            ClientError::InvalidArgs("github connector requires installation_id".to_string())
638        })?;
639        let private_key = if let Some(secret_id) = args
640            .private_key_secret
641            .as_deref()
642            .or(args.secrets.private_key.as_deref())
643            .and_then(|value| parse_secret_id(Some(value)))
644        {
645            PrivateKeySource::Secret(secret_id)
646        } else if let Some(pem) = args.private_key_pem.clone() {
647            PrivateKeySource::Inline(pem)
648        } else {
649            return Err(ClientError::InvalidArgs(
650                "github connector requires private_key_secret or private_key_pem".to_string(),
651            ));
652        };
653        Ok(ResolvedGitHubClientConfig {
654            app_id,
655            installation_id,
656            api_base_url: args
657                .api_base_url
658                .clone()
659                .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
660            private_key,
661        })
662    }
663
664    fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
665        self.state
666            .ctx
667            .read()
668            .expect("github connector ctx poisoned")
669            .clone()
670            .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
671    }
672
673    async fn request_json(
674        &self,
675        config: &ResolvedGitHubClientConfig,
676        method: Method,
677        path: &str,
678        body: Option<JsonValue>,
679        accept: &str,
680    ) -> Result<JsonValue, ClientError> {
681        let url = absolute_api_url(&config.api_base_url, path)?;
682        let response = self
683            .request_response(config, method, url, body, accept)
684            .await?;
685        response
686            .json::<JsonValue>()
687            .await
688            .map_err(|error| ClientError::Transport(error.to_string()))
689    }
690
691    async fn request_text(
692        &self,
693        config: &ResolvedGitHubClientConfig,
694        method: Method,
695        path: &str,
696        accept: &str,
697    ) -> Result<String, ClientError> {
698        let url = absolute_api_url(&config.api_base_url, path)?;
699        let response = self
700            .request_response(config, method, url, None, accept)
701            .await?;
702        response
703            .text()
704            .await
705            .map_err(|error| ClientError::Transport(error.to_string()))
706    }
707
708    async fn request_response(
709        &self,
710        config: &ResolvedGitHubClientConfig,
711        method: Method,
712        url: String,
713        body: Option<JsonValue>,
714        accept: &str,
715    ) -> Result<Response, ClientError> {
716        let mut retried_401 = false;
717        let mut retried_rate_limit = false;
718        loop {
719            let token = self.installation_token(config).await?;
720            let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
721            let ctx = self.ctx()?;
722            ctx.rate_limiter
723                .scoped(
724                    &self.provider_id,
725                    format!("installation:{}", config.installation_id),
726                )
727                .acquire()
728                .await;
729
730            let mut request = self
731                .http
732                .request(method.clone(), &url)
733                .header("Accept", accept)
734                .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
735                .header("Authorization", format!("Bearer {token_text}"));
736            if let Some(payload) = body.clone() {
737                request = request.json(&payload);
738            }
739            let response = request
740                .send()
741                .await
742                .map_err(|error| ClientError::Transport(error.to_string()))?;
743            self.record_rate_limit_observation(&response).await;
744
745            if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
746                self.tokens.invalidate(config.installation_id);
747                retried_401 = true;
748                continue;
749            }
750            if is_rate_limited(&response) && !retried_rate_limit {
751                tokio::time::sleep(rate_limit_backoff(&response)).await;
752                retried_rate_limit = true;
753                continue;
754            }
755            if !response.status().is_success() {
756                let status = response.status();
757                let body = response.text().await.unwrap_or_default();
758                let message = if body.trim().is_empty() {
759                    format!("github API request failed with status {status}")
760                } else {
761                    format!("github API request failed with status {status}: {body}")
762                };
763                return Err(if is_rate_limited_status(status) {
764                    ClientError::RateLimited(message)
765                } else {
766                    ClientError::Transport(message)
767                });
768            }
769            return Ok(response);
770        }
771    }
772
773    async fn installation_token(
774        &self,
775        config: &ResolvedGitHubClientConfig,
776    ) -> Result<SecretBytes, ClientError> {
777        let now = OffsetDateTime::now_utc();
778        if let Some(token) = self.tokens.get(config.installation_id, now) {
779            return Ok(token);
780        }
781        let jwt = self.mint_app_jwt(config).await?;
782        let url = absolute_api_url(
783            &config.api_base_url,
784            &format!(
785                "/app/installations/{}/access_tokens",
786                config.installation_id
787            ),
788        )?;
789        let ctx = self.ctx()?;
790        ctx.rate_limiter
791            .scoped(
792                &self.provider_id,
793                format!("installation:{}", config.installation_id),
794            )
795            .acquire()
796            .await;
797        let response = self
798            .http
799            .post(url)
800            .header("Accept", "application/vnd.github+json")
801            .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
802            .header("Authorization", format!("Bearer {jwt}"))
803            .send()
804            .await
805            .map_err(|error| ClientError::Transport(error.to_string()))?;
806        self.record_rate_limit_observation(&response).await;
807        if !response.status().is_success() {
808            let status = response.status();
809            let body = response.text().await.unwrap_or_default();
810            return Err(ClientError::Transport(format!(
811                "failed to create GitHub installation token ({status}): {body}"
812            )));
813        }
814        let payload: InstallationTokenResponse = response
815            .json()
816            .await
817            .map_err(|error| ClientError::Transport(error.to_string()))?;
818        let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
819        let token = SecretBytes::from(payload.token);
820        let result = token.reborrow();
821        self.tokens.store(config.installation_id, token, refresh_at);
822        Ok(result)
823    }
824
825    async fn mint_app_jwt(
826        &self,
827        config: &ResolvedGitHubClientConfig,
828    ) -> Result<String, ClientError> {
829        let pem = self.private_key_pem(config).await?;
830        let now = OffsetDateTime::now_utc();
831        let claims = GitHubJwtClaims {
832            iat: (now - Duration::seconds(60)).unix_timestamp(),
833            exp: (now + Duration::minutes(9)).unix_timestamp(),
834            iss: config.app_id.to_string(),
835        };
836        let header = Header::new(Algorithm::RS256);
837        let key = EncodingKey::from_rsa_pem(pem.as_bytes())
838            .map_err(|error| ClientError::Other(error.to_string()))?;
839        jsonwebtoken::encode(&header, &claims, &key)
840            .map_err(|error| ClientError::Other(error.to_string()))
841    }
842
843    async fn private_key_pem(
844        &self,
845        config: &ResolvedGitHubClientConfig,
846    ) -> Result<String, ClientError> {
847        match &config.private_key {
848            PrivateKeySource::Inline(pem) => Ok(pem.clone()),
849            PrivateKeySource::Secret(id) => {
850                let ctx = self.ctx()?;
851                let secret = ctx
852                    .secrets
853                    .get(id)
854                    .await
855                    .map_err(|error| ClientError::Other(error.to_string()))?;
856                secret.with_exposed(|bytes| {
857                    std::str::from_utf8(bytes)
858                        .map(|value| value.to_string())
859                        .map_err(|error| ClientError::Other(error.to_string()))
860                })
861            }
862        }
863    }
864
865    async fn record_rate_limit_observation(&self, response: &Response) {
866        let remaining = response
867            .headers()
868            .get("x-ratelimit-remaining")
869            .and_then(|value| value.to_str().ok())
870            .and_then(|value| value.parse::<u64>().ok());
871        let reset = response
872            .headers()
873            .get("x-ratelimit-reset")
874            .and_then(|value| value.to_str().ok())
875            .and_then(|value| value.parse::<i64>().ok());
876        let Some(remaining) = remaining else {
877            return;
878        };
879        if remaining > 10 {
880            return;
881        }
882        let Ok(ctx) = self.ctx() else {
883            return;
884        };
885        let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
886            return;
887        };
888        let _ = ctx
889            .event_log
890            .append(
891                &topic,
892                LogEvent::new(
893                    "github.rate_limit",
894                    json!({
895                        "remaining": remaining,
896                        "reset": reset,
897                        "status": response.status().as_u16(),
898                    }),
899                ),
900            )
901            .await;
902    }
903}
904
905impl ActivatedGitHubBinding {
906    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
907        let config: GitHubBindingConfig =
908            serde_json::from_value(binding.config.clone()).map_err(|error| {
909                ConnectorError::Activation(format!(
910                    "github binding `{}` has invalid config: {error}",
911                    binding.binding_id
912                ))
913            })?;
914        let signing_secret =
915            parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
916                ConnectorError::Activation(format!(
917                    "github binding `{}` requires secrets.signing_secret",
918                    binding.binding_id
919                ))
920            })?;
921        Ok(Self {
922            binding_id: binding.binding_id.clone(),
923            path: config.match_config.path,
924            signing_secret,
925            dedupe_enabled: binding.dedupe_key.is_some(),
926            dedupe_ttl: std::time::Duration::from_secs(
927                u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
928            ),
929        })
930    }
931}
932
933impl GitHubInstallationTokenStore {
934    fn new(capacity: usize) -> Self {
935        Self {
936            capacity: capacity.max(1),
937            state: Mutex::new(TokenCacheState::default()),
938        }
939    }
940
941    fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
942        let mut state = self.state.lock().expect("github token cache poisoned");
943        let refresh_at = state
944            .entries
945            .get(&installation_id)
946            .map(|entry| entry.refresh_at)?;
947        if refresh_at <= now {
948            state.entries.remove(&installation_id);
949            state.order.retain(|id| *id != installation_id);
950            return None;
951        }
952        touch_lru(&mut state.order, installation_id);
953        state
954            .entries
955            .get(&installation_id)
956            .map(|entry| entry.token.reborrow())
957    }
958
959    fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
960        let mut state = self.state.lock().expect("github token cache poisoned");
961        state.entries.insert(
962            installation_id,
963            InstallationTokenEntry { token, refresh_at },
964        );
965        touch_lru(&mut state.order, installation_id);
966        while state.entries.len() > self.capacity {
967            if let Some(evicted) = state.order.pop_front() {
968                state.entries.remove(&evicted);
969            }
970        }
971    }
972
973    fn invalidate(&self, installation_id: u64) {
974        let mut state = self.state.lock().expect("github token cache poisoned");
975        state.entries.remove(&installation_id);
976        state.order.retain(|id| *id != installation_id);
977    }
978}
979
980fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
981    order.retain(|id| *id != installation_id);
982    order.push_back(installation_id);
983}
984
985fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
986    serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
987}
988
989fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
990    match kind {
991        "issues" => serde_json::from_value(payload.clone())
992            .map(ParsedGitHubEvent::Issues)
993            .map_err(|error| ConnectorError::Json(error.to_string())),
994        "pull_request" => serde_json::from_value(payload.clone())
995            .map(ParsedGitHubEvent::PullRequest)
996            .map_err(|error| ConnectorError::Json(error.to_string())),
997        "issue_comment" => serde_json::from_value(payload.clone())
998            .map(ParsedGitHubEvent::IssueComment)
999            .map_err(|error| ConnectorError::Json(error.to_string())),
1000        "pull_request_review" => serde_json::from_value(payload.clone())
1001            .map(ParsedGitHubEvent::PullRequestReview)
1002            .map_err(|error| ConnectorError::Json(error.to_string())),
1003        "push" => serde_json::from_value(payload.clone())
1004            .map(ParsedGitHubEvent::Push)
1005            .map_err(|error| ConnectorError::Json(error.to_string())),
1006        "workflow_run" => serde_json::from_value(payload.clone())
1007            .map(ParsedGitHubEvent::WorkflowRun)
1008            .map_err(|error| ConnectorError::Json(error.to_string())),
1009        other => Ok(ParsedGitHubEvent::Other {
1010            kind: other.to_string(),
1011            raw: payload.clone(),
1012        }),
1013    }
1014}
1015
1016fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1017    let mut effective = headers.clone();
1018    canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1019    canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1020    canonicalize_header(
1021        headers,
1022        &mut effective,
1023        "x-github-delivery",
1024        "X-GitHub-Delivery",
1025    );
1026    canonicalize_header(
1027        headers,
1028        &mut effective,
1029        "x-github-hook-id",
1030        "X-GitHub-Hook-Id",
1031    );
1032    canonicalize_header(
1033        headers,
1034        &mut effective,
1035        "x-hub-signature-256",
1036        "X-Hub-Signature-256",
1037    );
1038    effective
1039}
1040
1041fn canonicalize_header(
1042    source: &BTreeMap<String, String>,
1043    target: &mut BTreeMap<String, String>,
1044    lookup_name: &str,
1045    canonical_name: &str,
1046) {
1047    if let Some(value) = header_value(source, lookup_name) {
1048        target
1049            .entry(canonical_name.to_string())
1050            .or_insert_with(|| value.to_string());
1051    }
1052}
1053
1054fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1055    headers
1056        .iter()
1057        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1058        .map(|(_, value)| value.as_str())
1059}
1060
1061fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1062    let trimmed = raw?.trim();
1063    if trimmed.is_empty() {
1064        return None;
1065    }
1066    let (base, version) = match trimmed.rsplit_once('@') {
1067        Some((base, version_text)) => {
1068            let version = version_text.parse::<u64>().ok()?;
1069            (base, SecretVersion::Exact(version))
1070        }
1071        None => (trimmed, SecretVersion::Latest),
1072    };
1073    let (namespace, name) = base.split_once('/')?;
1074    if namespace.is_empty() || name.is_empty() {
1075        return None;
1076    }
1077    Some(SecretId::new(namespace, name).with_version(version))
1078}
1079
1080fn load_secret_text_blocking(
1081    ctx: &ConnectorCtx,
1082    secret_id: &SecretId,
1083) -> Result<String, ConnectorError> {
1084    let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1085    secret.with_exposed(|bytes| {
1086        std::str::from_utf8(bytes)
1087            .map(|value| value.to_string())
1088            .map_err(|error| {
1089                ConnectorError::Secret(format!(
1090                    "github secret `{secret_id}` is not valid UTF-8: {error}"
1091                ))
1092            })
1093    })
1094}
1095
1096fn fallback_body_digest(body: &[u8]) -> String {
1097    let digest = Sha256::digest(body);
1098    format!("sha256:{}", hex::encode(digest))
1099}
1100
1101fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1102    let trimmed = input.trim().trim_matches('/');
1103    if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1104        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1105        if parts.len() >= 2 {
1106            return Ok(RepoRef {
1107                owner: parts[0].to_string(),
1108                repo: parts[1].to_string(),
1109            });
1110        }
1111    }
1112    if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1113        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1114        if parts.len() >= 2 {
1115            return Ok(RepoRef {
1116                owner: parts[0].to_string(),
1117                repo: parts[1].to_string(),
1118            });
1119        }
1120    }
1121    if let Some((owner, repo)) = trimmed.split_once('/') {
1122        if !owner.is_empty() && !repo.is_empty() {
1123            return Ok(RepoRef {
1124                owner: owner.to_string(),
1125                repo: repo.to_string(),
1126            });
1127        }
1128    }
1129    Err(ClientError::InvalidArgs(format!(
1130        "invalid GitHub repository reference `{input}`"
1131    )))
1132}
1133
1134fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1135    let trimmed = input.trim().trim_matches('/');
1136    if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1137        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1138        if parts.len() >= 4 {
1139            let kind = parts[2];
1140            if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1141                return Ok(IssueRef {
1142                    repo: RepoRef {
1143                        owner: parts[0].to_string(),
1144                        repo: parts[1].to_string(),
1145                    },
1146                    number: parts[3].parse().map_err(|error| {
1147                        ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1148                    })?,
1149                });
1150            }
1151        }
1152    }
1153    if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1154        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1155        if parts.len() >= 4 {
1156            let kind = parts[2];
1157            if kind == expected_kind
1158                || (expected_kind == "issue" && kind == "issues")
1159                || (expected_kind == "pull" && kind == "pulls")
1160            {
1161                return Ok(IssueRef {
1162                    repo: RepoRef {
1163                        owner: parts[0].to_string(),
1164                        repo: parts[1].to_string(),
1165                    },
1166                    number: parts[3].parse().map_err(|error| {
1167                        ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1168                    })?,
1169                });
1170            }
1171        }
1172    }
1173    Err(ClientError::InvalidArgs(format!(
1174        "invalid GitHub {expected_kind} URL `{input}`"
1175    )))
1176}
1177
1178fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1179    let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1180    base.join(path.trim_start_matches('/'))
1181        .map(|value| value.to_string())
1182        .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1183}
1184
1185fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1186    let eager_refresh = now + Duration::minutes(55);
1187    let Some(expires_at) = expires_at else {
1188        return eager_refresh;
1189    };
1190    let parsed =
1191        OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1192    parsed
1193        .map(|value| value - Duration::minutes(5))
1194        .map(|value| {
1195            if value < eager_refresh {
1196                value
1197            } else {
1198                eager_refresh
1199            }
1200        })
1201        .unwrap_or(eager_refresh)
1202}
1203
1204fn is_rate_limited(response: &Response) -> bool {
1205    if is_rate_limited_status(response.status()) {
1206        return true;
1207    }
1208    response
1209        .headers()
1210        .get("x-ratelimit-remaining")
1211        .and_then(|value| value.to_str().ok())
1212        .map(|value| value == "0")
1213        .unwrap_or(false)
1214}
1215
1216fn is_rate_limited_status(status: StatusCode) -> bool {
1217    status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1218}
1219
1220fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1221    if let Some(delay) = response
1222        .headers()
1223        .get("retry-after")
1224        .and_then(|value| value.to_str().ok())
1225        .and_then(|value| value.parse::<u64>().ok())
1226    {
1227        return std::time::Duration::from_secs(delay);
1228    }
1229    if let Some(reset_at) = response
1230        .headers()
1231        .get("x-ratelimit-reset")
1232        .and_then(|value| value.to_str().ok())
1233        .and_then(|value| value.parse::<i64>().ok())
1234        .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1235    {
1236        let now = OffsetDateTime::now_utc();
1237        if reset_at > now {
1238            let delta = reset_at - now;
1239            return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1240        }
1241    }
1242    std::time::Duration::from_millis(100)
1243}