Skip to main content

harn_vm/connectors/github/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::sync::{Arc, Mutex, RwLock};
3
4use async_trait::async_trait;
5use jsonwebtoken::{Algorithm, EncodingKey, Header};
6use reqwest::{Method, Response, StatusCode, Url};
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Map as JsonMap, Value as JsonValue};
10use sha2::{Digest, Sha256};
11use time::{Duration, OffsetDateTime};
12
13use crate::connectors::{
14    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15    HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::event_log::{EventLog, LogEvent, Topic};
18use crate::secrets::{SecretBytes, SecretId, SecretVersion};
19use crate::triggers::{
20    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
21    TriggerEvent, TriggerEventId,
22};
23
24#[cfg(test)]
25mod tests;
26
27pub const GITHUB_PROVIDER_ID: &str = "github";
28const GITHUB_RATE_LIMIT_TOPIC: &str = "connectors.github.rate_limit";
29const GITHUB_API_VERSION: &str = "2022-11-28";
30const DEFAULT_API_BASE_URL: &str = "https://api.github.com";
31
32pub struct GitHubConnector {
33    provider_id: ProviderId,
34    kinds: Vec<TriggerKind>,
35    state: Arc<GitHubConnectorState>,
36    client: Arc<GitHubClient>,
37}
38
39#[derive(Default)]
40struct GitHubConnectorState {
41    ctx: RwLock<Option<ConnectorCtx>>,
42    bindings: RwLock<HashMap<String, ActivatedGitHubBinding>>,
43}
44
45#[derive(Clone, Debug)]
46struct ActivatedGitHubBinding {
47    binding_id: String,
48    path: Option<String>,
49    signing_secret: SecretId,
50    dedupe_enabled: bool,
51    dedupe_ttl: std::time::Duration,
52}
53
54struct GitHubClient {
55    provider_id: ProviderId,
56    state: Arc<GitHubConnectorState>,
57    http: reqwest::Client,
58    tokens: GitHubInstallationTokenStore,
59}
60
61struct GitHubInstallationTokenStore {
62    capacity: usize,
63    state: Mutex<TokenCacheState>,
64}
65
66#[derive(Default)]
67struct TokenCacheState {
68    entries: HashMap<u64, InstallationTokenEntry>,
69    order: VecDeque<u64>,
70}
71
72struct InstallationTokenEntry {
73    token: SecretBytes,
74    refresh_at: OffsetDateTime,
75}
76
77#[derive(Clone, Debug)]
78struct ResolvedGitHubClientConfig {
79    app_id: u64,
80    installation_id: u64,
81    api_base_url: String,
82    private_key: PrivateKeySource,
83}
84
85#[derive(Clone, Debug)]
86enum PrivateKeySource {
87    Inline(String),
88    Secret(SecretId),
89}
90
91#[allow(dead_code)]
92#[derive(Debug)]
93enum ParsedGitHubEvent {
94    Issues(IssuesEvent),
95    PullRequest(PullRequestEvent),
96    IssueComment(IssueCommentEvent),
97    PullRequestReview(PullRequestReviewEvent),
98    Push(PushEvent),
99    WorkflowRun(WorkflowRunEvent),
100    DeploymentStatus(DeploymentStatusEvent),
101    CheckRun(CheckRunEvent),
102    Other { kind: String, raw: JsonValue },
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Deserialize)]
107struct IssuesEvent {
108    action: Option<String>,
109    issue: JsonValue,
110}
111
112#[allow(dead_code)]
113#[derive(Debug, Deserialize)]
114struct PullRequestEvent {
115    action: Option<String>,
116    pull_request: JsonValue,
117}
118
119#[allow(dead_code)]
120#[derive(Debug, Deserialize)]
121struct IssueCommentEvent {
122    action: Option<String>,
123    comment: JsonValue,
124    issue: JsonValue,
125}
126
127#[allow(dead_code)]
128#[derive(Debug, Deserialize)]
129struct PullRequestReviewEvent {
130    action: Option<String>,
131    review: JsonValue,
132    pull_request: JsonValue,
133}
134
135#[allow(dead_code)]
136#[derive(Debug, Deserialize)]
137struct PushEvent {
138    #[serde(default)]
139    commits: Vec<JsonValue>,
140    #[serde(default)]
141    distinct_size: Option<u64>,
142}
143
144#[allow(dead_code)]
145#[derive(Debug, Deserialize)]
146struct WorkflowRunEvent {
147    action: Option<String>,
148    workflow_run: JsonValue,
149}
150
151#[allow(dead_code)]
152#[derive(Debug, Deserialize)]
153struct DeploymentStatusEvent {
154    action: Option<String>,
155    deployment_status: JsonValue,
156    deployment: JsonValue,
157}
158
159#[allow(dead_code)]
160#[derive(Debug, Deserialize)]
161struct CheckRunEvent {
162    action: Option<String>,
163    check_run: JsonValue,
164}
165
166#[derive(Debug, Default, Deserialize)]
167struct GitHubBindingConfig {
168    #[serde(default, rename = "match")]
169    match_config: GitHubMatchConfig,
170    #[serde(default)]
171    secrets: GitHubSecretsConfig,
172}
173
174#[derive(Debug, Default, Deserialize)]
175struct GitHubMatchConfig {
176    path: Option<String>,
177}
178
179#[derive(Debug, Default, Deserialize)]
180struct GitHubSecretsConfig {
181    signing_secret: Option<String>,
182    private_key: Option<String>,
183}
184
185#[derive(Debug, Default, Deserialize)]
186struct GitHubClientConfigArgs {
187    app_id: Option<u64>,
188    installation_id: Option<u64>,
189    api_base_url: Option<String>,
190    private_key_pem: Option<String>,
191    private_key_secret: Option<String>,
192    #[serde(default)]
193    secrets: GitHubSecretsConfig,
194}
195
196#[derive(Debug, Deserialize)]
197struct CommentArgs {
198    #[serde(flatten)]
199    config: GitHubClientConfigArgs,
200    issue_url: String,
201    body: String,
202}
203
204#[derive(Debug, Deserialize)]
205struct AddLabelsArgs {
206    #[serde(flatten)]
207    config: GitHubClientConfigArgs,
208    issue_url: String,
209    labels: Vec<String>,
210}
211
212#[derive(Debug, Deserialize)]
213struct RequestReviewArgs {
214    #[serde(flatten)]
215    config: GitHubClientConfigArgs,
216    pr_url: String,
217    reviewers: Vec<String>,
218}
219
220#[derive(Debug, Deserialize)]
221struct MergePrArgs {
222    #[serde(flatten)]
223    config: GitHubClientConfigArgs,
224    pr_url: String,
225    #[serde(default)]
226    commit_title: Option<String>,
227    #[serde(default)]
228    commit_message: Option<String>,
229    #[serde(default)]
230    merge_method: Option<String>,
231    #[serde(default)]
232    sha: Option<String>,
233    #[serde(default)]
234    admin_override: bool,
235}
236
237#[derive(Debug, Deserialize)]
238struct ListStalePrsArgs {
239    #[serde(flatten)]
240    config: GitHubClientConfigArgs,
241    repo: String,
242    days: i64,
243}
244
245#[derive(Debug, Deserialize)]
246struct GetPrDiffArgs {
247    #[serde(flatten)]
248    config: GitHubClientConfigArgs,
249    pr_url: String,
250}
251
252#[derive(Debug, Deserialize)]
253struct CreateIssueArgs {
254    #[serde(flatten)]
255    config: GitHubClientConfigArgs,
256    repo: String,
257    title: String,
258    #[serde(default)]
259    body: Option<String>,
260    #[serde(default)]
261    labels: Option<Vec<String>>,
262}
263
264#[derive(Debug, Deserialize)]
265struct ApiCallArgs {
266    #[serde(flatten)]
267    config: GitHubClientConfigArgs,
268    path: String,
269    method: String,
270    #[serde(default)]
271    body: Option<JsonValue>,
272    #[serde(default)]
273    accept: Option<String>,
274}
275
276#[derive(Debug, Deserialize)]
277struct InstallationTokenResponse {
278    token: String,
279    #[serde(default)]
280    expires_at: Option<String>,
281}
282
283#[derive(Debug, Serialize)]
284struct GitHubJwtClaims {
285    iat: i64,
286    exp: i64,
287    iss: String,
288}
289
290#[derive(Clone, Debug)]
291struct RepoRef {
292    owner: String,
293    repo: String,
294}
295
296#[derive(Clone, Debug)]
297struct IssueRef {
298    repo: RepoRef,
299    number: u64,
300}
301
302impl GitHubConnector {
303    pub fn new() -> Self {
304        let state = Arc::new(GitHubConnectorState::default());
305        let client = Arc::new(GitHubClient {
306            provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
307            state: state.clone(),
308            http: crate::connectors::outbound_http_client("harn-github-connector"),
309            tokens: GitHubInstallationTokenStore::new(32),
310        });
311        Self {
312            provider_id: ProviderId::from(GITHUB_PROVIDER_ID),
313            kinds: vec![TriggerKind::from("webhook")],
314            state,
315            client,
316        }
317    }
318
319    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedGitHubBinding, ConnectorError> {
320        let bindings = self
321            .state
322            .bindings
323            .read()
324            .expect("github connector bindings poisoned");
325        if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
326            return bindings.get(binding_id).cloned().ok_or_else(|| {
327                ConnectorError::Unsupported(format!(
328                    "github connector has no active binding `{binding_id}`"
329                ))
330            });
331        }
332        if bindings.len() == 1 {
333            return bindings
334                .values()
335                .next()
336                .cloned()
337                .ok_or_else(|| ConnectorError::Activation("github bindings missing".to_string()));
338        }
339        Err(ConnectorError::Unsupported(
340            "github connector requires raw.metadata.binding_id when multiple bindings are active"
341                .to_string(),
342        ))
343    }
344
345    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
346        self.state
347            .ctx
348            .read()
349            .expect("github connector ctx poisoned")
350            .clone()
351            .ok_or_else(|| {
352                ConnectorError::Activation(
353                    "github connector must be initialized before use".to_string(),
354                )
355            })
356    }
357}
358
359impl Default for GitHubConnector {
360    fn default() -> Self {
361        Self::new()
362    }
363}
364
365#[async_trait]
366impl Connector for GitHubConnector {
367    fn provider_id(&self) -> &ProviderId {
368        &self.provider_id
369    }
370
371    fn kinds(&self) -> &[TriggerKind] {
372        &self.kinds
373    }
374
375    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
376        *self
377            .state
378            .ctx
379            .write()
380            .expect("github connector ctx poisoned") = Some(ctx);
381        Ok(())
382    }
383
384    async fn activate(
385        &self,
386        bindings: &[TriggerBinding],
387    ) -> Result<ActivationHandle, ConnectorError> {
388        let mut configured = HashMap::new();
389        let mut paths = BTreeSet::new();
390        for binding in bindings {
391            let activated = ActivatedGitHubBinding::from_binding(binding)?;
392            if let Some(path) = &activated.path {
393                if !paths.insert(path.clone()) {
394                    return Err(ConnectorError::Activation(format!(
395                        "github connector path `{path}` is configured by multiple bindings"
396                    )));
397                }
398            }
399            configured.insert(binding.binding_id.clone(), activated);
400        }
401        *self
402            .state
403            .bindings
404            .write()
405            .expect("github connector bindings poisoned") = configured;
406        Ok(ActivationHandle::new(
407            self.provider_id.clone(),
408            bindings.len(),
409        ))
410    }
411
412    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
413        let ctx = self.ctx()?;
414        let binding = self.binding_for_raw(&raw)?;
415        let provider = self.provider_id.clone();
416        let received_at = raw.received_at;
417        let headers = effective_headers(&raw.headers);
418        let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
419        futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
420            ctx.event_log.as_ref(),
421            &provider,
422            HmacSignatureStyle::github(),
423            &raw.body,
424            &headers,
425            secret.as_str(),
426            None,
427            received_at,
428        ))?;
429
430        let payload = raw.json_body()?;
431        let event_kind = header_value(&headers, "x-github-event")
432            .map(ToString::to_string)
433            .or_else(|| {
434                if raw.kind.trim().is_empty() {
435                    None
436                } else {
437                    Some(raw.kind.clone())
438                }
439            })
440            .ok_or_else(|| ConnectorError::MissingHeader("X-GitHub-Event".to_string()))?;
441        let _typed = parse_typed_event(&event_kind, &payload)?;
442        let dedupe_key = header_value(&headers, "x-github-delivery")
443            .map(ToString::to_string)
444            .unwrap_or_else(|| fallback_body_digest(&raw.body));
445        if binding.dedupe_enabled {
446            let inserted = ctx
447                .inbox
448                .insert_if_new(&binding.binding_id, &dedupe_key, binding.dedupe_ttl)
449                .await?;
450            if !inserted {
451                return Err(ConnectorError::DuplicateDelivery(format!(
452                    "duplicate GitHub delivery `{dedupe_key}` for binding `{}`",
453                    binding.binding_id
454                )));
455            }
456        }
457
458        let provider_payload =
459            ProviderPayload::normalize(&provider, &event_kind, &headers, payload)
460                .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
461        Ok(TriggerEvent {
462            id: TriggerEventId::new(),
463            provider,
464            kind: event_kind,
465            received_at,
466            occurred_at: raw.occurred_at,
467            dedupe_key,
468            trace_id: TraceId::new(),
469            tenant_id: raw.tenant_id.clone(),
470            headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
471            batch: None,
472            raw_body: Some(raw.body.clone()),
473            provider_payload,
474            signature_status: SignatureStatus::Verified,
475            dedupe_claimed: false,
476        })
477    }
478
479    fn payload_schema(&self) -> ProviderPayloadSchema {
480        ProviderPayloadSchema::named("GitHubEventPayload")
481    }
482
483    fn client(&self) -> Arc<dyn ConnectorClient> {
484        self.client.clone()
485    }
486}
487
488#[async_trait]
489impl ConnectorClient for GitHubClient {
490    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
491        match method {
492            "comment" => {
493                let args: CommentArgs = parse_args(args)?;
494                let config = self.resolve_client_config(&args.config)?;
495                let issue = parse_issue_like_url(&args.issue_url, "issue")?;
496                self.request_json(
497                    &config,
498                    Method::POST,
499                    &format!(
500                        "/repos/{}/{}/issues/{}/comments",
501                        issue.repo.owner, issue.repo.repo, issue.number
502                    ),
503                    Some(json!({ "body": args.body })),
504                    "application/vnd.github+json",
505                )
506                .await
507            }
508            "add_labels" => {
509                let args: AddLabelsArgs = parse_args(args)?;
510                let config = self.resolve_client_config(&args.config)?;
511                let issue = parse_issue_like_url(&args.issue_url, "issue")?;
512                self.request_json(
513                    &config,
514                    Method::POST,
515                    &format!(
516                        "/repos/{}/{}/issues/{}/labels",
517                        issue.repo.owner, issue.repo.repo, issue.number
518                    ),
519                    Some(json!({ "labels": args.labels })),
520                    "application/vnd.github+json",
521                )
522                .await
523            }
524            "request_review" => {
525                let args: RequestReviewArgs = parse_args(args)?;
526                let config = self.resolve_client_config(&args.config)?;
527                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
528                self.request_json(
529                    &config,
530                    Method::POST,
531                    &format!(
532                        "/repos/{}/{}/pulls/{}/requested_reviewers",
533                        pr.repo.owner, pr.repo.repo, pr.number
534                    ),
535                    Some(json!({ "reviewers": args.reviewers })),
536                    "application/vnd.github+json",
537                )
538                .await
539            }
540            "merge_pr" => {
541                let args: MergePrArgs = parse_args(args)?;
542                let config = self.resolve_client_config(&args.config)?;
543                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
544                let mut body = JsonMap::new();
545                if let Some(value) = args.commit_title {
546                    body.insert("commit_title".to_string(), JsonValue::String(value));
547                }
548                if let Some(value) = args.commit_message {
549                    body.insert("commit_message".to_string(), JsonValue::String(value));
550                }
551                if let Some(value) = args.merge_method {
552                    body.insert("merge_method".to_string(), JsonValue::String(value));
553                }
554                if let Some(value) = args.sha {
555                    body.insert("sha".to_string(), JsonValue::String(value));
556                }
557                let mut response = self
558                    .request_json(
559                        &config,
560                        Method::PUT,
561                        &format!(
562                            "/repos/{}/{}/pulls/{}/merge",
563                            pr.repo.owner, pr.repo.repo, pr.number
564                        ),
565                        Some(JsonValue::Object(body)),
566                        "application/vnd.github+json",
567                    )
568                    .await?;
569                if args.admin_override {
570                    if let Some(map) = response.as_object_mut() {
571                        map.insert(
572                            "admin_override_requested".to_string(),
573                            JsonValue::Bool(true),
574                        );
575                    }
576                }
577                Ok(response)
578            }
579            "list_stale_prs" => {
580                let args: ListStalePrsArgs = parse_args(args)?;
581                let config = self.resolve_client_config(&args.config)?;
582                let repo = parse_repo_ref(&args.repo)?;
583                let stale_before = (OffsetDateTime::now_utc() - Duration::days(args.days))
584                    .date()
585                    .to_string();
586                let query = format!(
587                    "repo:{}/{} is:pr is:open updated:<{}",
588                    repo.owner, repo.repo, stale_before
589                );
590                let url = Url::parse_with_params(
591                    &absolute_api_url(&config.api_base_url, "/search/issues")?,
592                    &[("q", query)],
593                )
594                .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
595                let response = self
596                    .request_response(
597                        &config,
598                        Method::GET,
599                        url.to_string(),
600                        None,
601                        "application/vnd.github+json",
602                    )
603                    .await?;
604                response
605                    .json::<JsonValue>()
606                    .await
607                    .map_err(|error| ClientError::Transport(error.to_string()))
608            }
609            "get_pr_diff" => {
610                let args: GetPrDiffArgs = parse_args(args)?;
611                let config = self.resolve_client_config(&args.config)?;
612                let pr = parse_issue_like_url(&args.pr_url, "pull")?;
613                let text = self
614                    .request_text(
615                        &config,
616                        Method::GET,
617                        &format!(
618                            "/repos/{}/{}/pulls/{}",
619                            pr.repo.owner, pr.repo.repo, pr.number
620                        ),
621                        "application/vnd.github.diff",
622                    )
623                    .await?;
624                Ok(JsonValue::String(text))
625            }
626            "create_issue" => {
627                let args: CreateIssueArgs = parse_args(args)?;
628                let config = self.resolve_client_config(&args.config)?;
629                let repo = parse_repo_ref(&args.repo)?;
630                let mut body = JsonMap::new();
631                body.insert("title".to_string(), JsonValue::String(args.title));
632                if let Some(value) = args.body {
633                    body.insert("body".to_string(), JsonValue::String(value));
634                }
635                if let Some(labels) = args.labels {
636                    body.insert("labels".to_string(), json!(labels));
637                }
638                self.request_json(
639                    &config,
640                    Method::POST,
641                    &format!("/repos/{}/{}/issues", repo.owner, repo.repo),
642                    Some(JsonValue::Object(body)),
643                    "application/vnd.github+json",
644                )
645                .await
646            }
647            "api_call" => {
648                let args: ApiCallArgs = parse_args(args)?;
649                let config = self.resolve_client_config(&args.config)?;
650                let method = Method::from_bytes(args.method.as_bytes())
651                    .map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
652                self.request_json(
653                    &config,
654                    method,
655                    &args.path,
656                    args.body,
657                    args.accept
658                        .as_deref()
659                        .unwrap_or("application/vnd.github+json"),
660                )
661                .await
662            }
663            other => Err(ClientError::MethodNotFound(format!(
664                "github connector does not implement outbound method `{other}`"
665            ))),
666        }
667    }
668}
669
670impl GitHubClient {
671    fn resolve_client_config(
672        &self,
673        args: &GitHubClientConfigArgs,
674    ) -> Result<ResolvedGitHubClientConfig, ClientError> {
675        let app_id = args.app_id.ok_or_else(|| {
676            ClientError::InvalidArgs("github connector requires app_id".to_string())
677        })?;
678        let installation_id = args.installation_id.ok_or_else(|| {
679            ClientError::InvalidArgs("github connector requires installation_id".to_string())
680        })?;
681        let private_key = if let Some(secret_id) = args
682            .private_key_secret
683            .as_deref()
684            .or(args.secrets.private_key.as_deref())
685            .and_then(|value| parse_secret_id(Some(value)))
686        {
687            PrivateKeySource::Secret(secret_id)
688        } else if let Some(pem) = args.private_key_pem.clone() {
689            PrivateKeySource::Inline(pem)
690        } else {
691            return Err(ClientError::InvalidArgs(
692                "github connector requires private_key_secret or private_key_pem".to_string(),
693            ));
694        };
695        Ok(ResolvedGitHubClientConfig {
696            app_id,
697            installation_id,
698            api_base_url: args
699                .api_base_url
700                .clone()
701                .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
702            private_key,
703        })
704    }
705
706    fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
707        self.state
708            .ctx
709            .read()
710            .expect("github connector ctx poisoned")
711            .clone()
712            .ok_or_else(|| ClientError::Other("github connector must be initialized".to_string()))
713    }
714
715    async fn request_json(
716        &self,
717        config: &ResolvedGitHubClientConfig,
718        method: Method,
719        path: &str,
720        body: Option<JsonValue>,
721        accept: &str,
722    ) -> Result<JsonValue, ClientError> {
723        let url = absolute_api_url(&config.api_base_url, path)?;
724        let response = self
725            .request_response(config, method, url, body, accept)
726            .await?;
727        response
728            .json::<JsonValue>()
729            .await
730            .map_err(|error| ClientError::Transport(error.to_string()))
731    }
732
733    async fn request_text(
734        &self,
735        config: &ResolvedGitHubClientConfig,
736        method: Method,
737        path: &str,
738        accept: &str,
739    ) -> Result<String, ClientError> {
740        let url = absolute_api_url(&config.api_base_url, path)?;
741        let response = self
742            .request_response(config, method, url, None, accept)
743            .await?;
744        response
745            .text()
746            .await
747            .map_err(|error| ClientError::Transport(error.to_string()))
748    }
749
750    async fn request_response(
751        &self,
752        config: &ResolvedGitHubClientConfig,
753        method: Method,
754        url: String,
755        body: Option<JsonValue>,
756        accept: &str,
757    ) -> Result<Response, ClientError> {
758        let mut retried_401 = false;
759        let mut retried_rate_limit = false;
760        loop {
761            let token = self.installation_token(config).await?;
762            let token_text = token.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string());
763            let ctx = self.ctx()?;
764            ctx.rate_limiter
765                .scoped(
766                    &self.provider_id,
767                    format!("installation:{}", config.installation_id),
768                )
769                .acquire()
770                .await;
771
772            let mut request = self
773                .http
774                .request(method.clone(), &url)
775                .header("Accept", accept)
776                .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
777                .header("Authorization", format!("Bearer {token_text}"));
778            if let Some(payload) = body.clone() {
779                request = request.json(&payload);
780            }
781            let response = request
782                .send()
783                .await
784                .map_err(|error| ClientError::Transport(error.to_string()))?;
785            self.record_rate_limit_observation(&response).await;
786
787            if response.status() == StatusCode::UNAUTHORIZED && !retried_401 {
788                self.tokens.invalidate(config.installation_id);
789                retried_401 = true;
790                continue;
791            }
792            if is_rate_limited(&response) && !retried_rate_limit {
793                tokio::time::sleep(rate_limit_backoff(&response)).await;
794                retried_rate_limit = true;
795                continue;
796            }
797            if !response.status().is_success() {
798                let status = response.status();
799                let body = response.text().await.unwrap_or_default();
800                let message = if body.trim().is_empty() {
801                    format!("github API request failed with status {status}")
802                } else {
803                    format!("github API request failed with status {status}: {body}")
804                };
805                return Err(if is_rate_limited_status(status) {
806                    ClientError::RateLimited(message)
807                } else {
808                    ClientError::Transport(message)
809                });
810            }
811            return Ok(response);
812        }
813    }
814
815    async fn installation_token(
816        &self,
817        config: &ResolvedGitHubClientConfig,
818    ) -> Result<SecretBytes, ClientError> {
819        let now = OffsetDateTime::now_utc();
820        if let Some(token) = self.tokens.get(config.installation_id, now) {
821            return Ok(token);
822        }
823        let jwt = self.mint_app_jwt(config).await?;
824        let url = absolute_api_url(
825            &config.api_base_url,
826            &format!(
827                "/app/installations/{}/access_tokens",
828                config.installation_id
829            ),
830        )?;
831        let ctx = self.ctx()?;
832        ctx.rate_limiter
833            .scoped(
834                &self.provider_id,
835                format!("installation:{}", config.installation_id),
836            )
837            .acquire()
838            .await;
839        let response = self
840            .http
841            .post(url)
842            .header("Accept", "application/vnd.github+json")
843            .header("X-GitHub-Api-Version", GITHUB_API_VERSION)
844            .header("Authorization", format!("Bearer {jwt}"))
845            .send()
846            .await
847            .map_err(|error| ClientError::Transport(error.to_string()))?;
848        self.record_rate_limit_observation(&response).await;
849        if !response.status().is_success() {
850            let status = response.status();
851            let body = response.text().await.unwrap_or_default();
852            return Err(ClientError::Transport(format!(
853                "failed to create GitHub installation token ({status}): {body}"
854            )));
855        }
856        let payload: InstallationTokenResponse = response
857            .json()
858            .await
859            .map_err(|error| ClientError::Transport(error.to_string()))?;
860        let refresh_at = installation_token_refresh_at(payload.expires_at.as_deref(), now);
861        let token = SecretBytes::from(payload.token);
862        let result = token.reborrow();
863        self.tokens.store(config.installation_id, token, refresh_at);
864        Ok(result)
865    }
866
867    async fn mint_app_jwt(
868        &self,
869        config: &ResolvedGitHubClientConfig,
870    ) -> Result<String, ClientError> {
871        let pem = self.private_key_pem(config).await?;
872        let now = OffsetDateTime::now_utc();
873        let claims = GitHubJwtClaims {
874            iat: (now - Duration::seconds(60)).unix_timestamp(),
875            exp: (now + Duration::minutes(9)).unix_timestamp(),
876            iss: config.app_id.to_string(),
877        };
878        let header = Header::new(Algorithm::RS256);
879        let key = EncodingKey::from_rsa_pem(pem.as_bytes())
880            .map_err(|error| ClientError::Other(error.to_string()))?;
881        jsonwebtoken::encode(&header, &claims, &key)
882            .map_err(|error| ClientError::Other(error.to_string()))
883    }
884
885    async fn private_key_pem(
886        &self,
887        config: &ResolvedGitHubClientConfig,
888    ) -> Result<String, ClientError> {
889        match &config.private_key {
890            PrivateKeySource::Inline(pem) => Ok(pem.clone()),
891            PrivateKeySource::Secret(id) => {
892                let ctx = self.ctx()?;
893                let secret = ctx
894                    .secrets
895                    .get(id)
896                    .await
897                    .map_err(|error| ClientError::Other(error.to_string()))?;
898                secret.with_exposed(|bytes| {
899                    std::str::from_utf8(bytes)
900                        .map(|value| value.to_string())
901                        .map_err(|error| ClientError::Other(error.to_string()))
902                })
903            }
904        }
905    }
906
907    async fn record_rate_limit_observation(&self, response: &Response) {
908        let remaining = response
909            .headers()
910            .get("x-ratelimit-remaining")
911            .and_then(|value| value.to_str().ok())
912            .and_then(|value| value.parse::<u64>().ok());
913        let reset = response
914            .headers()
915            .get("x-ratelimit-reset")
916            .and_then(|value| value.to_str().ok())
917            .and_then(|value| value.parse::<i64>().ok());
918        let Some(remaining) = remaining else {
919            return;
920        };
921        if remaining > 10 {
922            return;
923        }
924        let Ok(ctx) = self.ctx() else {
925            return;
926        };
927        let Ok(topic) = Topic::new(GITHUB_RATE_LIMIT_TOPIC) else {
928            return;
929        };
930        let _ = ctx
931            .event_log
932            .append(
933                &topic,
934                LogEvent::new(
935                    "github.rate_limit",
936                    json!({
937                        "remaining": remaining,
938                        "reset": reset,
939                        "status": response.status().as_u16(),
940                    }),
941                ),
942            )
943            .await;
944    }
945}
946
947impl ActivatedGitHubBinding {
948    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
949        let config: GitHubBindingConfig =
950            serde_json::from_value(binding.config.clone()).map_err(|error| {
951                ConnectorError::Activation(format!(
952                    "github binding `{}` has invalid config: {error}",
953                    binding.binding_id
954                ))
955            })?;
956        let signing_secret =
957            parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
958                ConnectorError::Activation(format!(
959                    "github binding `{}` requires secrets.signing_secret",
960                    binding.binding_id
961                ))
962            })?;
963        Ok(Self {
964            binding_id: binding.binding_id.clone(),
965            path: config.match_config.path,
966            signing_secret,
967            dedupe_enabled: binding.dedupe_key.is_some(),
968            dedupe_ttl: std::time::Duration::from_secs(
969                u64::from(crate::triggers::DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60,
970            ),
971        })
972    }
973}
974
975impl GitHubInstallationTokenStore {
976    fn new(capacity: usize) -> Self {
977        Self {
978            capacity: capacity.max(1),
979            state: Mutex::new(TokenCacheState::default()),
980        }
981    }
982
983    fn get(&self, installation_id: u64, now: OffsetDateTime) -> Option<SecretBytes> {
984        let mut state = self.state.lock().expect("github token cache poisoned");
985        let refresh_at = state
986            .entries
987            .get(&installation_id)
988            .map(|entry| entry.refresh_at)?;
989        if refresh_at <= now {
990            state.entries.remove(&installation_id);
991            state.order.retain(|id| *id != installation_id);
992            return None;
993        }
994        touch_lru(&mut state.order, installation_id);
995        state
996            .entries
997            .get(&installation_id)
998            .map(|entry| entry.token.reborrow())
999    }
1000
1001    fn store(&self, installation_id: u64, token: SecretBytes, refresh_at: OffsetDateTime) {
1002        let mut state = self.state.lock().expect("github token cache poisoned");
1003        state.entries.insert(
1004            installation_id,
1005            InstallationTokenEntry { token, refresh_at },
1006        );
1007        touch_lru(&mut state.order, installation_id);
1008        while state.entries.len() > self.capacity {
1009            if let Some(evicted) = state.order.pop_front() {
1010                state.entries.remove(&evicted);
1011            }
1012        }
1013    }
1014
1015    fn invalidate(&self, installation_id: u64) {
1016        let mut state = self.state.lock().expect("github token cache poisoned");
1017        state.entries.remove(&installation_id);
1018        state.order.retain(|id| *id != installation_id);
1019    }
1020}
1021
1022fn touch_lru(order: &mut VecDeque<u64>, installation_id: u64) {
1023    order.retain(|id| *id != installation_id);
1024    order.push_back(installation_id);
1025}
1026
1027fn parse_args<T: DeserializeOwned>(args: JsonValue) -> Result<T, ClientError> {
1028    serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
1029}
1030
1031fn parse_typed_event(kind: &str, payload: &JsonValue) -> Result<ParsedGitHubEvent, ConnectorError> {
1032    match kind {
1033        "issues" => serde_json::from_value(payload.clone())
1034            .map(ParsedGitHubEvent::Issues)
1035            .map_err(|error| ConnectorError::Json(error.to_string())),
1036        "pull_request" => serde_json::from_value(payload.clone())
1037            .map(ParsedGitHubEvent::PullRequest)
1038            .map_err(|error| ConnectorError::Json(error.to_string())),
1039        "issue_comment" => serde_json::from_value(payload.clone())
1040            .map(ParsedGitHubEvent::IssueComment)
1041            .map_err(|error| ConnectorError::Json(error.to_string())),
1042        "pull_request_review" => serde_json::from_value(payload.clone())
1043            .map(ParsedGitHubEvent::PullRequestReview)
1044            .map_err(|error| ConnectorError::Json(error.to_string())),
1045        "push" => serde_json::from_value(payload.clone())
1046            .map(ParsedGitHubEvent::Push)
1047            .map_err(|error| ConnectorError::Json(error.to_string())),
1048        "workflow_run" => serde_json::from_value(payload.clone())
1049            .map(ParsedGitHubEvent::WorkflowRun)
1050            .map_err(|error| ConnectorError::Json(error.to_string())),
1051        "deployment_status" => serde_json::from_value(payload.clone())
1052            .map(ParsedGitHubEvent::DeploymentStatus)
1053            .map_err(|error| ConnectorError::Json(error.to_string())),
1054        "check_run" => serde_json::from_value(payload.clone())
1055            .map(ParsedGitHubEvent::CheckRun)
1056            .map_err(|error| ConnectorError::Json(error.to_string())),
1057        other => Ok(ParsedGitHubEvent::Other {
1058            kind: other.to_string(),
1059            raw: payload.clone(),
1060        }),
1061    }
1062}
1063
1064fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
1065    let mut effective = headers.clone();
1066    canonicalize_header(headers, &mut effective, "content-type", "Content-Type");
1067    canonicalize_header(headers, &mut effective, "x-github-event", "X-GitHub-Event");
1068    canonicalize_header(
1069        headers,
1070        &mut effective,
1071        "x-github-delivery",
1072        "X-GitHub-Delivery",
1073    );
1074    canonicalize_header(
1075        headers,
1076        &mut effective,
1077        "x-github-hook-id",
1078        "X-GitHub-Hook-Id",
1079    );
1080    canonicalize_header(
1081        headers,
1082        &mut effective,
1083        "x-hub-signature-256",
1084        "X-Hub-Signature-256",
1085    );
1086    effective
1087}
1088
1089fn canonicalize_header(
1090    source: &BTreeMap<String, String>,
1091    target: &mut BTreeMap<String, String>,
1092    lookup_name: &str,
1093    canonical_name: &str,
1094) {
1095    if let Some(value) = header_value(source, lookup_name) {
1096        target
1097            .entry(canonical_name.to_string())
1098            .or_insert_with(|| value.to_string());
1099    }
1100}
1101
1102fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1103    headers
1104        .iter()
1105        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1106        .map(|(_, value)| value.as_str())
1107}
1108
1109fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1110    let trimmed = raw?.trim();
1111    if trimmed.is_empty() {
1112        return None;
1113    }
1114    let (base, version) = match trimmed.rsplit_once('@') {
1115        Some((base, version_text)) => {
1116            let version = version_text.parse::<u64>().ok()?;
1117            (base, SecretVersion::Exact(version))
1118        }
1119        None => (trimmed, SecretVersion::Latest),
1120    };
1121    let (namespace, name) = base.split_once('/')?;
1122    if namespace.is_empty() || name.is_empty() {
1123        return None;
1124    }
1125    Some(SecretId::new(namespace, name).with_version(version))
1126}
1127
1128fn load_secret_text_blocking(
1129    ctx: &ConnectorCtx,
1130    secret_id: &SecretId,
1131) -> Result<String, ConnectorError> {
1132    let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1133    secret.with_exposed(|bytes| {
1134        std::str::from_utf8(bytes)
1135            .map(|value| value.to_string())
1136            .map_err(|error| {
1137                ConnectorError::Secret(format!(
1138                    "github secret `{secret_id}` is not valid UTF-8: {error}"
1139                ))
1140            })
1141    })
1142}
1143
1144fn fallback_body_digest(body: &[u8]) -> String {
1145    let digest = Sha256::digest(body);
1146    format!("sha256:{}", hex::encode(digest))
1147}
1148
1149fn parse_repo_ref(input: &str) -> Result<RepoRef, ClientError> {
1150    let trimmed = input.trim().trim_matches('/');
1151    if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1152        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1153        if parts.len() >= 2 {
1154            return Ok(RepoRef {
1155                owner: parts[0].to_string(),
1156                repo: parts[1].to_string(),
1157            });
1158        }
1159    }
1160    if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1161        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1162        if parts.len() >= 2 {
1163            return Ok(RepoRef {
1164                owner: parts[0].to_string(),
1165                repo: parts[1].to_string(),
1166            });
1167        }
1168    }
1169    if let Some((owner, repo)) = trimmed.split_once('/') {
1170        if !owner.is_empty() && !repo.is_empty() {
1171            return Ok(RepoRef {
1172                owner: owner.to_string(),
1173                repo: repo.to_string(),
1174            });
1175        }
1176    }
1177    Err(ClientError::InvalidArgs(format!(
1178        "invalid GitHub repository reference `{input}`"
1179    )))
1180}
1181
1182fn parse_issue_like_url(input: &str, expected_kind: &str) -> Result<IssueRef, ClientError> {
1183    let trimmed = input.trim().trim_matches('/');
1184    if let Some(rest) = trimmed.strip_prefix("https://github.com/") {
1185        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1186        if parts.len() >= 4 {
1187            let kind = parts[2];
1188            if kind == expected_kind || (expected_kind == "issue" && kind == "issues") {
1189                return Ok(IssueRef {
1190                    repo: RepoRef {
1191                        owner: parts[0].to_string(),
1192                        repo: parts[1].to_string(),
1193                    },
1194                    number: parts[3].parse().map_err(|error| {
1195                        ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1196                    })?,
1197                });
1198            }
1199        }
1200    }
1201    if let Some(rest) = trimmed.strip_prefix("https://api.github.com/repos/") {
1202        let parts: Vec<&str> = rest.trim_matches('/').split('/').collect();
1203        if parts.len() >= 4 {
1204            let kind = parts[2];
1205            if kind == expected_kind
1206                || (expected_kind == "issue" && kind == "issues")
1207                || (expected_kind == "pull" && kind == "pulls")
1208            {
1209                return Ok(IssueRef {
1210                    repo: RepoRef {
1211                        owner: parts[0].to_string(),
1212                        repo: parts[1].to_string(),
1213                    },
1214                    number: parts[3].parse().map_err(|error| {
1215                        ClientError::InvalidArgs(format!("invalid GitHub URL `{input}`: {error}"))
1216                    })?,
1217                });
1218            }
1219        }
1220    }
1221    Err(ClientError::InvalidArgs(format!(
1222        "invalid GitHub {expected_kind} URL `{input}`"
1223    )))
1224}
1225
1226fn absolute_api_url(base_url: &str, path: &str) -> Result<String, ClientError> {
1227    let base = Url::parse(base_url).map_err(|error| ClientError::InvalidArgs(error.to_string()))?;
1228    base.join(path.trim_start_matches('/'))
1229        .map(|value| value.to_string())
1230        .map_err(|error| ClientError::InvalidArgs(error.to_string()))
1231}
1232
1233fn installation_token_refresh_at(expires_at: Option<&str>, now: OffsetDateTime) -> OffsetDateTime {
1234    let eager_refresh = now + Duration::minutes(55);
1235    let Some(expires_at) = expires_at else {
1236        return eager_refresh;
1237    };
1238    let parsed =
1239        OffsetDateTime::parse(expires_at, &time::format_description::well_known::Rfc3339).ok();
1240    parsed
1241        .map(|value| value - Duration::minutes(5))
1242        .map(|value| {
1243            if value < eager_refresh {
1244                value
1245            } else {
1246                eager_refresh
1247            }
1248        })
1249        .unwrap_or(eager_refresh)
1250}
1251
1252fn is_rate_limited(response: &Response) -> bool {
1253    if is_rate_limited_status(response.status()) {
1254        return true;
1255    }
1256    response
1257        .headers()
1258        .get("x-ratelimit-remaining")
1259        .and_then(|value| value.to_str().ok())
1260        .map(|value| value == "0")
1261        .unwrap_or(false)
1262}
1263
1264fn is_rate_limited_status(status: StatusCode) -> bool {
1265    status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::FORBIDDEN
1266}
1267
1268fn rate_limit_backoff(response: &Response) -> std::time::Duration {
1269    if let Some(delay) = response
1270        .headers()
1271        .get("retry-after")
1272        .and_then(|value| value.to_str().ok())
1273        .and_then(|value| value.parse::<u64>().ok())
1274    {
1275        return std::time::Duration::from_secs(delay);
1276    }
1277    if let Some(reset_at) = response
1278        .headers()
1279        .get("x-ratelimit-reset")
1280        .and_then(|value| value.to_str().ok())
1281        .and_then(|value| value.parse::<i64>().ok())
1282        .and_then(|value| OffsetDateTime::from_unix_timestamp(value).ok())
1283    {
1284        let now = OffsetDateTime::now_utc();
1285        if reset_at > now {
1286            let delta = reset_at - now;
1287            return std::time::Duration::from_secs(delta.whole_seconds().max(0) as u64);
1288        }
1289    }
1290    std::time::Duration::from_millis(100)
1291}