pipedash_plugin_github/
plugin.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use futures::future::join_all;
5use octocrab::Octocrab;
6use pipedash_plugin_api::*;
7
8use crate::{
9    client,
10    config,
11    metadata,
12};
13
14pub struct GitHubPlugin {
15    metadata: PluginMetadata,
16    client: Option<client::GitHubClient>,
17    provider_id: Option<i64>,
18    config: HashMap<String, String>,
19}
20
21impl Default for GitHubPlugin {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27mod permission_mapping {
28
29    pub const FINE_GRAINED_FEATURE_MAPPINGS: &[(&str, &str)] = &[
30        ("list_monitor_workflows", "Repository Metadata"),
31        ("view_run_history", "Repository Metadata"),
32        ("monitor_status", "Actions (Read)"),
33        ("trigger_dispatch", "Actions (Write)"),
34        ("cancel_workflows", "Actions (Write)"),
35        ("access_org_repos", "Organization members and teams (Read)"),
36    ];
37
38    pub fn map_feature_permissions(
39        token_type: &str, feature_id: &str, classic_permissions: &[String],
40    ) -> Vec<String> {
41        match token_type {
42            "fine_grained" => FINE_GRAINED_FEATURE_MAPPINGS
43                .iter()
44                .find(|(id, _)| *id == feature_id)
45                .map(|(_, perm)| vec![perm.to_string()])
46                .unwrap_or_default(),
47            _ => classic_permissions.to_vec(),
48        }
49    }
50}
51
52impl GitHubPlugin {
53    pub fn new() -> Self {
54        Self {
55            metadata: metadata::create_metadata(),
56            client: None,
57            provider_id: None,
58            config: HashMap::new(),
59        }
60    }
61
62    fn client(&self) -> PluginResult<&client::GitHubClient> {
63        self.client
64            .as_ref()
65            .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
66    }
67}
68
69#[async_trait]
70impl Plugin for GitHubPlugin {
71    fn metadata(&self) -> &PluginMetadata {
72        &self.metadata
73    }
74
75    fn initialize(
76        &mut self, provider_id: i64, config: HashMap<String, String>,
77        _http_client: Option<std::sync::Arc<reqwest::Client>>,
78    ) -> PluginResult<()> {
79        let token = config
80            .get("token")
81            .ok_or_else(|| PluginError::InvalidConfig("Missing GitHub token".to_string()))?;
82
83        if token.is_empty() {
84            tracing::error!(provider_id = provider_id, "GitHub token is empty");
85            return Err(PluginError::InvalidConfig(
86                "GitHub token is empty. Please check keyring permissions.".to_string(),
87            ));
88        }
89
90        tracing::debug!(token_length = token.len(), "Initializing GitHub plugin");
91
92        let base_url = config::get_base_url(&config);
93        let api_url = config::build_api_url(&base_url);
94
95        tracing::debug!(api_url = %api_url, "Using GitHub API URL");
96
97        let octocrab = Octocrab::builder()
98            .personal_token(token.clone())
99            .base_uri(&api_url)
100            .map_err(|e| PluginError::InvalidConfig(format!("Failed to set base URI: {e}")))?
101            .build()
102            .map_err(|e| {
103                PluginError::InvalidConfig(format!("Failed to build GitHub client: {e}"))
104            })?;
105
106        let github_client = client::GitHubClient::new(octocrab, token.clone())?;
107        self.client = Some(github_client);
108        self.provider_id = Some(provider_id);
109        self.config = config;
110
111        Ok(())
112    }
113
114    async fn validate_credentials(&self) -> PluginResult<bool> {
115        let client = self.client()?;
116
117        client
118            .retry_policy
119            .retry(|| async {
120                let result = client.octocrab.current().user().await;
121
122                match result {
123                    Ok(_) => Ok(true),
124                    Err(e) => {
125                        if e.to_string().contains("401") {
126                            Err(PluginError::AuthenticationFailed(
127                                "Invalid GitHub token".to_string(),
128                            ))
129                        } else {
130                            Err(PluginError::ApiError(format!(
131                                "Failed to validate credentials: {e}"
132                            )))
133                        }
134                    }
135                }
136            })
137            .await
138    }
139
140    async fn fetch_available_pipelines(
141        &self, params: Option<PaginationParams>,
142    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
143        let client = self.client()?;
144        client.fetch_all_repositories(params).await
145    }
146
147    async fn fetch_organizations(&self) -> PluginResult<Vec<pipedash_plugin_api::Organization>> {
148        let client = self.client()?;
149        client.fetch_organizations().await
150    }
151
152    async fn fetch_available_pipelines_filtered(
153        &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
154    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
155        let client = self.client()?;
156        client
157            .fetch_available_pipelines_filtered(org, search, params)
158            .await
159    }
160
161    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
162        let provider_id = self
163            .provider_id
164            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
165
166        let repositories = config::get_repositories(&self.config);
167        tracing::debug!(repositories = ?repositories, "Configured GitHub repositories");
168
169        if repositories.is_empty() {
170            return Err(PluginError::InvalidConfig(
171                "No repositories configured".to_string(),
172            ));
173        }
174
175        let client = self.client()?;
176        let futures = repositories
177            .into_iter()
178            .map(|repo_full_name| client.fetch_repo_workflows(provider_id, repo_full_name));
179
180        let results = join_all(futures).await;
181
182        let mut all_pipelines = Vec::new();
183        let mut errors = Vec::new();
184
185        for result in results {
186            match result {
187                Ok(mut pipelines) => {
188                    tracing::debug!(count = pipelines.len(), "GitHub repo returned workflows");
189                    all_pipelines.append(&mut pipelines);
190                }
191                Err(e) => errors.push(e),
192            }
193        }
194
195        let unique_count = all_pipelines
196            .iter()
197            .map(|p| &p.id)
198            .collect::<std::collections::HashSet<_>>()
199            .len();
200        tracing::debug!(
201            unique_pipelines = unique_count,
202            total_pipelines = all_pipelines.len(),
203            "GitHub pipeline fetch complete"
204        );
205
206        if !errors.is_empty() && all_pipelines.is_empty() {
207            return Err(errors.into_iter().next().unwrap());
208        }
209
210        Ok(all_pipelines)
211    }
212
213    async fn fetch_run_history(
214        &self, pipeline_id: &str, limit: usize,
215    ) -> PluginResult<Vec<PipelineRun>> {
216        let parts: Vec<&str> = pipeline_id.split("__").collect();
217        if parts.len() != 5 {
218            return Err(PluginError::InvalidConfig(format!(
219                "Invalid pipeline ID format: {} (expected 5 parts, got {})",
220                pipeline_id,
221                parts.len()
222            )));
223        }
224
225        let owner = parts[2];
226        let repo = parts[3];
227        let workflow_id_str = parts[4];
228        let workflow_id: u64 = workflow_id_str.parse().map_err(|_| {
229            PluginError::InvalidConfig(format!("Invalid workflow ID: {workflow_id_str}"))
230        })?;
231
232        let client = self.client()?;
233        let runs = client
234            .fetch_run_history(owner, repo, workflow_id, limit)
235            .await?;
236
237        let pipeline_runs = runs
238            .into_iter()
239            .map(|run| client::run_to_pipeline_run(run, pipeline_id))
240            .collect();
241
242        Ok(pipeline_runs)
243    }
244
245    async fn fetch_run_details(
246        &self, pipeline_id: &str, run_number: i64,
247    ) -> PluginResult<PipelineRun> {
248        let parts: Vec<&str> = pipeline_id.split("__").collect();
249        if parts.len() != 5 {
250            return Err(PluginError::InvalidConfig(format!(
251                "Invalid pipeline ID format: {} (expected 5 parts, got {})",
252                pipeline_id,
253                parts.len()
254            )));
255        }
256
257        let owner = parts[2];
258        let repo = parts[3];
259        let workflow_id_str = parts[4];
260        let workflow_id: u64 = workflow_id_str.parse().map_err(|_| {
261            PluginError::InvalidConfig(format!("Invalid workflow ID: {workflow_id_str}"))
262        })?;
263
264        let client = self.client()?;
265        let run = client
266            .fetch_run_by_number(owner, repo, workflow_id, run_number)
267            .await?;
268
269        Ok(client::run_to_pipeline_run(run, pipeline_id))
270    }
271
272    async fn fetch_workflow_parameters(
273        &self, _workflow_id: &str,
274    ) -> PluginResult<Vec<WorkflowParameter>> {
275        Ok(vec![WorkflowParameter {
276            name: "ref".to_string(),
277            label: Some("Ref".to_string()),
278            description: Some("Branch, tag, or commit SHA to run workflow on".to_string()),
279            param_type: WorkflowParameterType::String {
280                default: Some("main".to_string()),
281            },
282            required: true,
283        }])
284    }
285
286    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
287        let parts: Vec<&str> = params.workflow_id.split("__").collect();
288        if parts.len() != 5 {
289            return Err(PluginError::InvalidConfig(format!(
290                "Invalid workflow ID format: {} (expected 5 parts, got {})",
291                params.workflow_id,
292                parts.len()
293            )));
294        }
295
296        let owner = parts[2];
297        let repo = parts[3];
298        let workflow_id = parts[4];
299
300        let ref_value = params
301            .inputs
302            .as_ref()
303            .and_then(|inputs| inputs.get("ref"))
304            .and_then(|v| v.as_str())
305            .unwrap_or("main")
306            .to_string();
307
308        let mut body = serde_json::json!({
309            "ref": &ref_value,
310        });
311
312        if let Some(inputs) = params.inputs {
313            if let Some(obj) = inputs.as_object() {
314                let workflow_inputs: serde_json::Map<String, serde_json::Value> = obj
315                    .iter()
316                    .filter(|(k, _)| k.as_str() != "ref")
317                    .map(|(k, v)| (k.clone(), v.clone()))
318                    .collect();
319
320                if !workflow_inputs.is_empty() {
321                    body["inputs"] = serde_json::Value::Object(workflow_inputs);
322                }
323            }
324        }
325
326        let client = self.client()?;
327        let url = format!("/repos/{owner}/{repo}/actions/workflows/{workflow_id}/dispatches");
328
329        let response: Result<serde_json::Value, octocrab::Error> =
330            client.octocrab.post(url, Some(&body)).await;
331
332        if let Err(e) = response {
333            return Err(PluginError::ApiError(format!(
334                "Failed to trigger workflow: {e}"
335            )));
336        }
337
338        let workflow_id_u64: u64 = workflow_id.parse().map_err(|_| {
339            PluginError::InvalidConfig(format!("Invalid workflow ID: {workflow_id}"))
340        })?;
341
342        let trigger_time = chrono::Utc::now();
343
344        let previous_runs = client
345            .fetch_run_history(owner, repo, workflow_id_u64, 5)
346            .await?;
347        let previous_latest_run_number = previous_runs.first().map(|r| r.run_number).unwrap_or(0);
348
349        let mut new_run = None;
350        for attempt in 1..=10 {
351            tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
352
353            let runs = client
354                .fetch_run_history(owner, repo, workflow_id_u64, 5)
355                .await?;
356
357            for run in runs {
358                if run.run_number > previous_latest_run_number
359                    && run.head_branch == ref_value
360                    && run.created_at.with_timezone(&chrono::Utc) >= trigger_time
361                {
362                    tracing::debug!(
363                        run_number = run.run_number,
364                        attempt = attempt,
365                        "Found new GitHub run"
366                    );
367                    new_run = Some(run);
368                    break;
369                }
370            }
371
372            if new_run.is_some() {
373                break;
374            }
375
376            tracing::debug!(
377                attempt = attempt,
378                max_attempts = 10,
379                "Waiting for new GitHub run to appear"
380            );
381        }
382
383        let (logs_url, run_number) = if let Some(run) = new_run {
384            (run.html_url.to_string(), run.run_number)
385        } else {
386            let runs = client
387                .fetch_run_history(owner, repo, workflow_id_u64, 1)
388                .await?;
389            (
390                runs.first()
391                    .map(|r| r.html_url.to_string())
392                    .unwrap_or_default(),
393                runs.first().map(|r| r.run_number).unwrap_or(0),
394            )
395        };
396
397        Ok(serde_json::json!({
398            "message": format!("Triggered workflow on ref {}", ref_value),
399            "run_number": run_number,
400            "logs_url": logs_url
401        })
402        .to_string())
403    }
404
405    async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
406        let parts: Vec<&str> = pipeline_id.split("__").collect();
407        if parts.len() != 5 {
408            return Err(PluginError::InvalidConfig(format!(
409                "Invalid pipeline ID format: {} (expected 5 parts, got {})",
410                pipeline_id,
411                parts.len()
412            )));
413        }
414
415        let owner = parts[2];
416        let repo = parts[3];
417        let workflow_id_str = parts[4];
418        let workflow_id: u64 = workflow_id_str.parse().map_err(|_| {
419            PluginError::InvalidConfig(format!("Invalid workflow ID: {workflow_id_str}"))
420        })?;
421
422        let client = self.client()?;
423        let run = client
424            .fetch_run_by_number(owner, repo, workflow_id, run_number)
425            .await?;
426
427        let run_id_u64: u64 = run.id.0;
428        client.cancel_run(owner, repo, run_id_u64).await
429    }
430    async fn check_permissions(&self) -> PluginResult<PermissionStatus> {
431        let client = self.client()?;
432        client.check_token_permissions().await
433    }
434
435    fn get_feature_availability(&self, status: &PermissionStatus) -> Vec<FeatureAvailability> {
436        use permission_mapping::map_feature_permissions;
437
438        let features = &self.metadata().features;
439
440        let token_type = status
441            .metadata
442            .get("token_type")
443            .map(|s| s.as_str())
444            .unwrap_or("classic_pat");
445
446        let granted_perms: std::collections::HashSet<String> = status
447            .permissions
448            .iter()
449            .filter(|p| p.granted)
450            .map(|p| p.permission.name.clone())
451            .collect();
452
453        features
454            .iter()
455            .map(|feature| {
456                let mapped_required =
457                    map_feature_permissions(token_type, &feature.id, &feature.required_permissions);
458
459                let missing: Vec<String> = mapped_required
460                    .iter()
461                    .filter(|perm| !granted_perms.contains(*perm))
462                    .cloned()
463                    .collect();
464
465                let transformed_feature = Feature {
466                    id: feature.id.clone(),
467                    name: feature.name.clone(),
468                    description: feature.description.clone(),
469                    required_permissions: mapped_required,
470                };
471
472                FeatureAvailability {
473                    feature: transformed_feature,
474                    available: missing.is_empty(),
475                    missing_permissions: missing,
476                }
477            })
478            .collect()
479    }
480}