pipedash_plugin_gitlab/
plugin.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9    client,
10    config,
11    mapper,
12    metadata,
13    types,
14};
15
16pub struct GitLabPlugin {
17    metadata: PluginMetadata,
18    client: Option<client::GitLabClient>,
19    provider_id: Option<i64>,
20    config: HashMap<String, String>,
21}
22
23impl Default for GitLabPlugin {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl GitLabPlugin {
30    pub fn new() -> Self {
31        Self {
32            metadata: metadata::create_metadata(),
33            client: None,
34            provider_id: None,
35            config: HashMap::new(),
36        }
37    }
38
39    fn client(&self) -> PluginResult<&client::GitLabClient> {
40        self.client
41            .as_ref()
42            .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
43    }
44
45    async fn fetch_all_projects(&self) -> PluginResult<Vec<types::Project>> {
46        let client = self.client()?;
47        let mut all_projects = Vec::new();
48        let mut page = 1;
49
50        loop {
51            let params = PaginationParams {
52                page,
53                page_size: 100,
54            };
55            let response = client.list_projects(&params).await?;
56            if response.items.is_empty() {
57                break;
58            }
59            all_projects.extend(response.items);
60
61            if !response.has_more || page > 100 {
62                break;
63            }
64            page += 1;
65        }
66
67        if let Some(selected_paths) = config::parse_selected_items(&self.config) {
68            Ok(all_projects
69                .into_iter()
70                .filter(|p| {
71                    let normalized = p.name_with_namespace.replace(" ", "");
72                    selected_paths.contains(&normalized)
73                })
74                .collect())
75        } else {
76            Ok(all_projects)
77        }
78    }
79
80    async fn fetch_projects_paginated(
81        &self, params: &PaginationParams,
82    ) -> PluginResult<PaginatedResponse<types::Project>> {
83        let client = self.client()?;
84        let response = client.list_projects(params).await?;
85
86        if let Some(selected_paths) = config::parse_selected_items(&self.config) {
87            let filtered_items: Vec<_> = response
88                .items
89                .into_iter()
90                .filter(|p| {
91                    let normalized = p.name_with_namespace.replace(" ", "");
92                    selected_paths.contains(&normalized)
93                })
94                .collect();
95            let filtered_count = filtered_items.len();
96            Ok(PaginatedResponse::new(
97                filtered_items,
98                params.page,
99                params.page_size,
100                filtered_count,
101            ))
102        } else {
103            Ok(response)
104        }
105    }
106}
107
108#[async_trait]
109impl Plugin for GitLabPlugin {
110    fn metadata(&self) -> &PluginMetadata {
111        &self.metadata
112    }
113
114    fn provider_type(&self) -> &str {
115        "gitlab"
116    }
117
118    fn initialize(
119        &mut self, provider_id: i64, config: HashMap<String, String>,
120        http_client: Option<std::sync::Arc<reqwest::Client>>,
121    ) -> PluginResult<()> {
122        let token = config
123            .get("token")
124            .ok_or_else(|| PluginError::InvalidConfig("Missing GitLab access token".to_string()))?
125            .to_string();
126
127        let base_url = config::get_base_url(&config);
128        let api_url = config::build_api_url(&base_url);
129
130        let client = http_client.unwrap_or_else(|| {
131            std::sync::Arc::new(
132                reqwest::Client::builder()
133                    .use_rustls_tls()
134                    .pool_max_idle_per_host(10)
135                    .timeout(Duration::from_secs(30))
136                    .connect_timeout(Duration::from_secs(10))
137                    .tcp_keepalive(Duration::from_secs(60))
138                    .build()
139                    .expect("Failed to build HTTP client"),
140            )
141        });
142
143        self.client = Some(client::GitLabClient::new(client, api_url, token));
144        self.provider_id = Some(provider_id);
145        self.config = config;
146
147        Ok(())
148    }
149
150    async fn validate_credentials(&self) -> PluginResult<bool> {
151        let client = self.client()?;
152        client.get_user().await?;
153        Ok(true)
154    }
155
156    async fn fetch_available_pipelines(
157        &self, params: Option<PaginationParams>,
158    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
159        let params = params.unwrap_or_default();
160        let response = self.fetch_projects_paginated(&params).await?;
161
162        let available_pipelines = response
163            .items
164            .iter()
165            .map(mapper::map_available_pipeline)
166            .collect();
167
168        Ok(PaginatedResponse::new(
169            available_pipelines,
170            response.page,
171            response.page_size,
172            response.total_count,
173        ))
174    }
175
176    async fn fetch_organizations(&self) -> PluginResult<Vec<pipedash_plugin_api::Organization>> {
177        let client = self.client()?;
178        client.list_groups().await
179    }
180
181    async fn fetch_available_pipelines_filtered(
182        &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
183    ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
184        let params = params.unwrap_or_default();
185        let client = self.client()?;
186        let response = client.list_projects_filtered(org, search, &params).await?;
187
188        let available_pipelines = response
189            .items
190            .iter()
191            .map(mapper::map_available_pipeline)
192            .collect();
193
194        Ok(PaginatedResponse::new(
195            available_pipelines,
196            response.page,
197            response.page_size,
198            response.total_count,
199        ))
200    }
201
202    async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
203        let provider_id = self
204            .provider_id
205            .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
206
207        let client = self.client()?;
208        let projects = self.fetch_all_projects().await?;
209
210        let pipeline_futures = projects.iter().map(|project| async move {
211            let pipelines = client.get_project_pipelines(project.id, 1).await.ok()?;
212            let latest_pipeline = pipelines.first();
213            Some(mapper::map_pipeline(project, latest_pipeline, provider_id))
214        });
215
216        let results: Vec<Option<Pipeline>> = join_all(pipeline_futures).await;
217        Ok(results.into_iter().flatten().collect())
218    }
219
220    async fn fetch_run_history(
221        &self, pipeline_id: &str, limit: usize,
222    ) -> PluginResult<Vec<PipelineRun>> {
223        let (provider_id, project_id) = config::parse_pipeline_id(pipeline_id)?;
224        let client = self.client()?;
225
226        let project = client.get_project(project_id).await?;
227
228        let pipeline_list = client.get_project_pipelines(project_id, limit).await?;
229
230        let parts: Vec<&str> = project.name_with_namespace.split('/').collect();
231        let namespace = if parts.len() >= 2 {
232            Some(parts[..parts.len() - 1].join("/"))
233        } else {
234            None
235        };
236
237        let detailed_pipeline_futures = pipeline_list
238            .iter()
239            .map(|p| async move { client.get_pipeline(project_id, p.id).await });
240
241        let detailed_pipelines = join_all(detailed_pipeline_futures).await;
242
243        Ok(detailed_pipelines
244            .into_iter()
245            .filter_map(|result| result.ok())
246            .map(|p| mapper::map_pipeline_run(&p, project_id, provider_id, namespace.as_deref()))
247            .collect())
248    }
249
250    async fn fetch_run_details(
251        &self, pipeline_id: &str, run_number: i64,
252    ) -> PluginResult<PipelineRun> {
253        let (provider_id, project_id) = config::parse_pipeline_id(pipeline_id)?;
254        let client = self.client()?;
255
256        let project = client.get_project(project_id).await?;
257        let pipeline = client.get_pipeline(project_id, run_number).await?;
258
259        let parts: Vec<&str> = project.name_with_namespace.split('/').collect();
260        let namespace = if parts.len() >= 2 {
261            Some(parts[..parts.len() - 1].join("/"))
262        } else {
263            None
264        };
265
266        Ok(mapper::map_pipeline_run(
267            &pipeline,
268            project_id,
269            provider_id,
270            namespace.as_deref(),
271        ))
272    }
273
274    async fn fetch_workflow_parameters(
275        &self, _workflow_id: &str,
276    ) -> PluginResult<Vec<WorkflowParameter>> {
277        Ok(vec![WorkflowParameter {
278            name: "ref".to_string(),
279            label: Some("Ref".to_string()),
280            description: Some("Branch, tag, or commit SHA to run pipeline on".to_string()),
281            param_type: WorkflowParameterType::String {
282                default: Some("main".to_string()),
283            },
284            required: true,
285        }])
286    }
287
288    async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
289        let (_provider_id, project_id) = config::parse_pipeline_id(&params.workflow_id)?;
290
291        let client = self.client()?;
292
293        let ref_name = params
294            .inputs
295            .as_ref()
296            .and_then(|inputs| inputs.get("ref"))
297            .and_then(|v| v.as_str())
298            .unwrap_or("main")
299            .to_string();
300
301        let variables = params.inputs.as_ref().and_then(|inputs| {
302            inputs.get("variables").and_then(|vars| {
303                vars.as_object().map(|obj| {
304                    obj.iter()
305                        .map(|(key, value)| types::PipelineVariable {
306                            key: key.clone(),
307                            value: value.as_str().unwrap_or("").to_string(),
308                            variable_type: Some("env_var".to_string()),
309                        })
310                        .collect()
311                })
312            })
313        });
314
315        let pipeline = client
316            .trigger_pipeline(project_id, ref_name, variables)
317            .await?;
318        Ok(pipeline.web_url)
319    }
320
321    async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
322        let (_, project_id) = config::parse_pipeline_id(pipeline_id)?;
323        let client = self.client()?;
324        client.cancel_pipeline(project_id, run_number).await?;
325        Ok(())
326    }
327
328    async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
329        Err(PluginError::NotSupported(
330            "GitLab runners monitoring not implemented".to_string(),
331        ))
332    }
333
334    async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
335        Err(PluginError::NotSupported(
336            "Artifacts download not implemented".to_string(),
337        ))
338    }
339
340    async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
341        Err(PluginError::NotSupported(
342            "Build queues not supported by GitLab".to_string(),
343        ))
344    }
345
346    fn get_migrations(&self) -> Vec<String> {
347        vec![]
348    }
349}