1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use futures::future::join_all;
6use pipedash_plugin_api::*;
7
8use crate::{
9 client,
10 config,
11 mapper,
12 metadata,
13 types,
14};
15
16pub struct GitLabPlugin {
17 metadata: PluginMetadata,
18 client: Option<client::GitLabClient>,
19 provider_id: Option<i64>,
20 config: HashMap<String, String>,
21}
22
23impl Default for GitLabPlugin {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl GitLabPlugin {
30 pub fn new() -> Self {
31 Self {
32 metadata: metadata::create_metadata(),
33 client: None,
34 provider_id: None,
35 config: HashMap::new(),
36 }
37 }
38
39 fn client(&self) -> PluginResult<&client::GitLabClient> {
40 self.client
41 .as_ref()
42 .ok_or_else(|| PluginError::Internal("Plugin not initialized".to_string()))
43 }
44
45 async fn fetch_all_projects(&self) -> PluginResult<Vec<types::Project>> {
46 let client = self.client()?;
47 let mut all_projects = Vec::new();
48 let mut page = 1;
49
50 loop {
51 let params = PaginationParams {
52 page,
53 page_size: 100,
54 };
55 let response = client.list_projects(¶ms).await?;
56 if response.items.is_empty() {
57 break;
58 }
59 all_projects.extend(response.items);
60
61 if !response.has_more || page > 100 {
62 break;
63 }
64 page += 1;
65 }
66
67 if let Some(selected_paths) = config::parse_selected_items(&self.config) {
68 Ok(all_projects
69 .into_iter()
70 .filter(|p| {
71 let normalized = p.name_with_namespace.replace(" ", "");
72 selected_paths.contains(&normalized)
73 })
74 .collect())
75 } else {
76 Ok(all_projects)
77 }
78 }
79
80 async fn fetch_projects_paginated(
81 &self, params: &PaginationParams,
82 ) -> PluginResult<PaginatedResponse<types::Project>> {
83 let client = self.client()?;
84 let response = client.list_projects(params).await?;
85
86 if let Some(selected_paths) = config::parse_selected_items(&self.config) {
87 let filtered_items: Vec<_> = response
88 .items
89 .into_iter()
90 .filter(|p| {
91 let normalized = p.name_with_namespace.replace(" ", "");
92 selected_paths.contains(&normalized)
93 })
94 .collect();
95 let filtered_count = filtered_items.len();
96 Ok(PaginatedResponse::new(
97 filtered_items,
98 params.page,
99 params.page_size,
100 filtered_count,
101 ))
102 } else {
103 Ok(response)
104 }
105 }
106}
107
108#[async_trait]
109impl Plugin for GitLabPlugin {
110 fn metadata(&self) -> &PluginMetadata {
111 &self.metadata
112 }
113
114 fn provider_type(&self) -> &str {
115 "gitlab"
116 }
117
118 fn initialize(
119 &mut self, provider_id: i64, config: HashMap<String, String>,
120 http_client: Option<std::sync::Arc<reqwest::Client>>,
121 ) -> PluginResult<()> {
122 let token = config
123 .get("token")
124 .ok_or_else(|| PluginError::InvalidConfig("Missing GitLab access token".to_string()))?
125 .to_string();
126
127 let base_url = config::get_base_url(&config);
128 let api_url = config::build_api_url(&base_url);
129
130 let client = http_client.unwrap_or_else(|| {
131 std::sync::Arc::new(
132 reqwest::Client::builder()
133 .use_rustls_tls()
134 .pool_max_idle_per_host(10)
135 .timeout(Duration::from_secs(30))
136 .connect_timeout(Duration::from_secs(10))
137 .tcp_keepalive(Duration::from_secs(60))
138 .build()
139 .expect("Failed to build HTTP client"),
140 )
141 });
142
143 self.client = Some(client::GitLabClient::new(client, api_url, token));
144 self.provider_id = Some(provider_id);
145 self.config = config;
146
147 Ok(())
148 }
149
150 async fn validate_credentials(&self) -> PluginResult<bool> {
151 let client = self.client()?;
152 client.get_user().await?;
153 Ok(true)
154 }
155
156 async fn fetch_available_pipelines(
157 &self, params: Option<PaginationParams>,
158 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
159 let params = params.unwrap_or_default();
160 let response = self.fetch_projects_paginated(¶ms).await?;
161
162 let available_pipelines = response
163 .items
164 .iter()
165 .map(mapper::map_available_pipeline)
166 .collect();
167
168 Ok(PaginatedResponse::new(
169 available_pipelines,
170 response.page,
171 response.page_size,
172 response.total_count,
173 ))
174 }
175
176 async fn fetch_organizations(&self) -> PluginResult<Vec<pipedash_plugin_api::Organization>> {
177 let client = self.client()?;
178 client.list_groups().await
179 }
180
181 async fn fetch_available_pipelines_filtered(
182 &self, org: Option<String>, search: Option<String>, params: Option<PaginationParams>,
183 ) -> PluginResult<PaginatedResponse<AvailablePipeline>> {
184 let params = params.unwrap_or_default();
185 let client = self.client()?;
186 let response = client.list_projects_filtered(org, search, ¶ms).await?;
187
188 let available_pipelines = response
189 .items
190 .iter()
191 .map(mapper::map_available_pipeline)
192 .collect();
193
194 Ok(PaginatedResponse::new(
195 available_pipelines,
196 response.page,
197 response.page_size,
198 response.total_count,
199 ))
200 }
201
202 async fn fetch_pipelines(&self) -> PluginResult<Vec<Pipeline>> {
203 let provider_id = self
204 .provider_id
205 .ok_or_else(|| PluginError::Internal("Provider ID not set".to_string()))?;
206
207 let client = self.client()?;
208 let projects = self.fetch_all_projects().await?;
209
210 let pipeline_futures = projects.iter().map(|project| async move {
211 let pipelines = client.get_project_pipelines(project.id, 1).await.ok()?;
212 let latest_pipeline = pipelines.first();
213 Some(mapper::map_pipeline(project, latest_pipeline, provider_id))
214 });
215
216 let results: Vec<Option<Pipeline>> = join_all(pipeline_futures).await;
217 Ok(results.into_iter().flatten().collect())
218 }
219
220 async fn fetch_run_history(
221 &self, pipeline_id: &str, limit: usize,
222 ) -> PluginResult<Vec<PipelineRun>> {
223 let (provider_id, project_id) = config::parse_pipeline_id(pipeline_id)?;
224 let client = self.client()?;
225
226 let project = client.get_project(project_id).await?;
227
228 let pipeline_list = client.get_project_pipelines(project_id, limit).await?;
229
230 let parts: Vec<&str> = project.name_with_namespace.split('/').collect();
231 let namespace = if parts.len() >= 2 {
232 Some(parts[..parts.len() - 1].join("/"))
233 } else {
234 None
235 };
236
237 let detailed_pipeline_futures = pipeline_list
238 .iter()
239 .map(|p| async move { client.get_pipeline(project_id, p.id).await });
240
241 let detailed_pipelines = join_all(detailed_pipeline_futures).await;
242
243 Ok(detailed_pipelines
244 .into_iter()
245 .filter_map(|result| result.ok())
246 .map(|p| mapper::map_pipeline_run(&p, project_id, provider_id, namespace.as_deref()))
247 .collect())
248 }
249
250 async fn fetch_run_details(
251 &self, pipeline_id: &str, run_number: i64,
252 ) -> PluginResult<PipelineRun> {
253 let (provider_id, project_id) = config::parse_pipeline_id(pipeline_id)?;
254 let client = self.client()?;
255
256 let project = client.get_project(project_id).await?;
257 let pipeline = client.get_pipeline(project_id, run_number).await?;
258
259 let parts: Vec<&str> = project.name_with_namespace.split('/').collect();
260 let namespace = if parts.len() >= 2 {
261 Some(parts[..parts.len() - 1].join("/"))
262 } else {
263 None
264 };
265
266 Ok(mapper::map_pipeline_run(
267 &pipeline,
268 project_id,
269 provider_id,
270 namespace.as_deref(),
271 ))
272 }
273
274 async fn fetch_workflow_parameters(
275 &self, _workflow_id: &str,
276 ) -> PluginResult<Vec<WorkflowParameter>> {
277 Ok(vec![WorkflowParameter {
278 name: "ref".to_string(),
279 label: Some("Ref".to_string()),
280 description: Some("Branch, tag, or commit SHA to run pipeline on".to_string()),
281 param_type: WorkflowParameterType::String {
282 default: Some("main".to_string()),
283 },
284 required: true,
285 }])
286 }
287
288 async fn trigger_pipeline(&self, params: TriggerParams) -> PluginResult<String> {
289 let (_provider_id, project_id) = config::parse_pipeline_id(¶ms.workflow_id)?;
290
291 let client = self.client()?;
292
293 let ref_name = params
294 .inputs
295 .as_ref()
296 .and_then(|inputs| inputs.get("ref"))
297 .and_then(|v| v.as_str())
298 .unwrap_or("main")
299 .to_string();
300
301 let variables = params.inputs.as_ref().and_then(|inputs| {
302 inputs.get("variables").and_then(|vars| {
303 vars.as_object().map(|obj| {
304 obj.iter()
305 .map(|(key, value)| types::PipelineVariable {
306 key: key.clone(),
307 value: value.as_str().unwrap_or("").to_string(),
308 variable_type: Some("env_var".to_string()),
309 })
310 .collect()
311 })
312 })
313 });
314
315 let pipeline = client
316 .trigger_pipeline(project_id, ref_name, variables)
317 .await?;
318 Ok(pipeline.web_url)
319 }
320
321 async fn cancel_run(&self, pipeline_id: &str, run_number: i64) -> PluginResult<()> {
322 let (_, project_id) = config::parse_pipeline_id(pipeline_id)?;
323 let client = self.client()?;
324 client.cancel_pipeline(project_id, run_number).await?;
325 Ok(())
326 }
327
328 async fn fetch_agents(&self) -> PluginResult<Vec<BuildAgent>> {
329 Err(PluginError::NotSupported(
330 "GitLab runners monitoring not implemented".to_string(),
331 ))
332 }
333
334 async fn fetch_artifacts(&self, _run_id: &str) -> PluginResult<Vec<BuildArtifact>> {
335 Err(PluginError::NotSupported(
336 "Artifacts download not implemented".to_string(),
337 ))
338 }
339
340 async fn fetch_queues(&self) -> PluginResult<Vec<BuildQueue>> {
341 Err(PluginError::NotSupported(
342 "Build queues not supported by GitLab".to_string(),
343 ))
344 }
345
346 fn get_migrations(&self) -> Vec<String> {
347 vec![]
348 }
349}