1use anyhow::Error;
2use fluentci_secrets::{Provider, Vault, VaultConfig};
3use opentelemetry::{
4 global,
5 trace::{Span, TraceContextExt, Tracer, TracerProvider},
6 Context, KeyValue,
7};
8use owo_colors::OwoColorize;
9use std::sync::mpsc::{self, Sender};
10use std::sync::Arc;
11use std::{
12 collections::HashMap,
13 env::{self, current_dir},
14};
15use std::{path::Path, thread};
16use uuid::Uuid;
17
18use fluentci_ext::envhub::Envhub;
19use fluentci_ext::service::Service as ServiceExt;
20use fluentci_ext::Extension;
21use fluentci_types::{
22 nix::NixArgs,
23 process_compose::{self, Process},
24 secret::Secret,
25 Output,
26};
27
28use crate::get_hmac;
29
30use super::edge::Edge;
31use super::vertex::{Runnable, Vertex};
32
33#[derive(Default, Debug, Clone)]
34pub struct Volume {
35 pub id: String,
36 pub label: String,
37 pub path: String,
38 pub key: String,
39}
40
41#[derive(Default, Clone)]
42pub struct Service {
43 pub id: String,
44 pub name: String,
45 pub vertices: Vec<Vertex>,
46 pub working_dir: String,
47}
48
49impl Into<Process> for Service {
50 fn into(self) -> Process {
51 Process {
52 command: self
53 .vertices
54 .iter()
55 .map(|v| v.runner.format_command(&v.command))
56 .collect::<Vec<String>>()
57 .join(" ; "),
58 depends_on: None,
59 working_dir: Some(self.working_dir),
60 ..Default::default()
61 }
62 }
63}
64
65pub enum GraphCommand {
66 AddVolume(String, String, String),
67 AddVertex(
68 String,
69 String,
70 String,
71 Vec<String>,
72 Arc<Box<dyn Extension + Send + Sync>>,
73 ),
74 AddEnvVariable(String, String),
75 AddSecretVariable(String, String, String),
76 EnableService(String),
77 AddEdge(usize, usize),
78 Execute(Output),
79 AddSecretManager(String, Provider),
80}
81
82#[derive(Clone)]
83pub struct Graph {
84 pub vertices: Vec<Vertex>,
85 pub edges: Vec<Edge>,
86 pub volumes: Vec<Volume>,
87 pub services: Vec<Service>,
88 pub enabled_services: Vec<Service>,
89 pub vaults: HashMap<String, Arc<Box<dyn Vault + Send + Sync>>>,
90 pub secrets: HashMap<String, String>,
91 pub secret_names: HashMap<String, String>,
92
93 tx: Sender<(String, usize)>,
94 pub runner: Arc<Box<dyn Extension + Send + Sync>>,
95 pub work_dir: String,
96 pub nix_args: NixArgs,
97}
98
99impl Graph {
100 pub fn new(tx: Sender<(String, usize)>, runner: Arc<Box<dyn Extension + Send + Sync>>) -> Self {
101 let work_dir = current_dir().unwrap().to_str().unwrap().to_string();
102 Graph {
103 vertices: Vec::new(),
104 volumes: Vec::new(),
105 services: Vec::new(),
106 enabled_services: Vec::new(),
107 edges: Vec::new(),
108 tx,
109 runner,
110 work_dir,
111 nix_args: NixArgs::default(),
112 vaults: HashMap::new(),
113 secrets: HashMap::new(),
114 secret_names: HashMap::new(),
115 }
116 }
117
118 pub fn set_secret(&mut self, name: String, value: String) -> Result<String, Error> {
119 let id = Uuid::new_v4().to_string();
120 let key = get_hmac(id.clone(), name.clone())?;
121 self.secrets.insert(key, value);
122 self.secret_names.insert(id.clone(), name);
123 Ok(id)
124 }
125
126 pub fn get_secret_plaintext(&self, secret_id: String, name: String) -> Result<String, Error> {
127 let key = get_hmac(secret_id, name)?;
128 match self.secrets.get(&key) {
129 Some(value) => Ok(value.clone()),
130 None => Err(Error::msg("Secret not found")),
131 }
132 }
133
134 pub fn get_secret(
135 &mut self,
136 secret_manager_id: &str,
137 name: String,
138 ) -> Result<Vec<Secret>, Error> {
139 match self.vaults.get(secret_manager_id) {
140 Some(provider) => {
141 let provider = provider.clone();
142 let cloned_name = name.clone();
143 let handler = thread::spawn(move || {
144 let rt = tokio::runtime::Runtime::new().unwrap();
145 rt.block_on(provider.download_json(&name))
146 });
147 let secrets = match handler.join() {
148 Ok(Ok(secrets)) => secrets,
149 Ok(Err(e)) => return Err(Error::msg(e)),
150 Err(_) => return Err(Error::msg("Failed to join thread")),
151 };
152
153 if secrets.is_empty() {
154 return Err(Error::msg(format!("Secret {} not found", cloned_name)));
155 }
156
157 let mut results = Vec::new();
158 secrets.iter().for_each(|(key, value)| {
159 let id = Uuid::new_v4().to_string();
160 let hmac = get_hmac(id.clone(), key.clone()).unwrap();
161 self.secrets.insert(hmac, value.clone());
162 self.secret_names.insert(id.clone(), key.clone());
163 results.push(Secret {
164 id: id.clone(),
165 name: key.clone(),
166 mount: cloned_name.clone(),
167 });
168 });
169
170 Ok(results)
171 }
172 None => Err(Error::msg(format!(
173 "Secret manager {} not found",
174 secret_manager_id
175 ))),
176 }
177 }
178
179 pub fn register_service(&mut self, id: &str) {
180 if self
182 .vertices
183 .iter()
184 .find(|v| v.id == id && v.label == "asService")
185 .is_none()
186 {
187 return;
188 }
189
190 let vertex = self
191 .vertices
192 .iter()
193 .find(|v| v.id == id && v.label == "asService")
194 .unwrap();
195
196 let mut visited = vec![false; self.vertices.len()];
198 let mut stack = Vec::new();
199 let mut service = Service {
200 id: id.to_string(),
201 name: vertex.command.clone(),
202 vertices: Vec::new(),
203 working_dir: self.work_dir.clone(),
204 };
205
206 let skip = vec![
207 "git",
208 "git-checkout",
209 "git-last-commit",
210 "tree",
211 "http",
212 "cache",
213 "file",
214 "directory",
215 "chmod",
216 "withFile",
217 "asService",
218 "trust",
219 ];
220
221 for (i, vertex) in self.vertices.iter().enumerate() {
222 if vertex.id == id {
223 stack.push(i);
224 break;
225 }
226 }
227
228 while let Some(i) = stack.pop() {
229 if visited[i] {
230 continue;
231 }
232 visited[i] = true;
233
234 for edge in self.edges.iter().filter(|e| e.to == i) {
235 stack.push(edge.from);
236 }
237
238 if skip.contains(&self.vertices[i].label.as_str()) {
239 continue;
240 }
241
242 if self.vertices[i].label == "withWorkdir" {
243 if !Path::new(&self.vertices[i].command).exists() {
244 println!("Error: {}", self.vertices[i].id);
245 match fluentci_logging::error(
246 &format!("Error: {}", self.vertices[i].id),
247 "register-service",
248 ) {
249 Ok(_) => {}
250 Err(e) => {
251 println!("{}", e);
252 }
253 }
254 self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
255 break;
256 }
257 service.working_dir = self.vertices[i].command.clone();
258 continue;
259 }
260
261 if self.vertices[i].label == "useEnv" {
262 match Envhub::default().r#use(&self.vertices[i].command, &self.work_dir) {
263 Ok(_) => {}
264 Err(e) => {
265 println!("Error: {}", e);
266 self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
267 break;
268 }
269 }
270 continue;
271 }
272
273 if self.vertices[i].command.is_empty() {
274 continue;
275 }
276
277 let vertex = self.vertices[i].clone();
278 service.vertices.insert(0, vertex);
279 }
280
281 self.services.push(service);
282 }
283
284 pub fn execute(&mut self, command: GraphCommand) -> Result<(), Error> {
285 match command {
286 GraphCommand::AddVolume(id, label, path) => {
287 self.volumes.push(Volume {
288 id,
289 label,
290 key: path.clone(),
291 path,
292 });
293 }
294 GraphCommand::AddVertex(id, label, command, needs, runner) => {
295 match self.vertices.iter_mut().find(|v| v.id == id) {
296 Some(vertex) => vertex.needs.extend(needs),
297 None => self.vertices.push(Vertex {
298 id,
299 label,
300 command,
301 needs,
302 runner,
303 }),
304 }
305 }
306 GraphCommand::AddEdge(from, to) => {
307 self.edges.push(Edge { from, to });
308 }
309 GraphCommand::AddEnvVariable(key, value) => {
310 env::set_var(key, value);
311 }
312 GraphCommand::AddSecretVariable(env_name, secret_id, secret_name) => {
313 let value = self.get_secret_plaintext(secret_id, secret_name)?;
314 env::set_var(env_name, value);
315 }
316 GraphCommand::Execute(Output::Stdout) => {
317 self.execute_graph(Output::Stdout);
318 }
319 GraphCommand::Execute(Output::Stderr) => {
320 self.execute_graph(Output::Stderr);
321 }
322 GraphCommand::EnableService(id) => match self.services.iter().find(|s| s.id == id) {
323 Some(service) => self.enabled_services.push(service.clone()),
324 None => return Err(Error::msg("Service not found")),
325 },
326 GraphCommand::AddSecretManager(id, provider) => match provider {
327 Provider::Aws(config) => {
328 self.vaults
329 .insert(id, Arc::new(Box::new(config.into_vault()?)));
330 }
331 Provider::Google(config) => {
332 self.vaults
333 .insert(id, Arc::new(Box::new(config.into_vault()?)));
334 }
335 Provider::Hashicorp(config) => {
336 self.vaults
337 .insert(id, Arc::new(Box::new(config.into_vault()?)));
338 }
339 Provider::Azure(config) => {
340 self.vaults
341 .insert(id, Arc::new(Box::new(config.into_vault()?)));
342 }
343 },
344 }
345 Ok(())
346 }
347
348 pub fn execute_services(&mut self, ctx: &Context) -> Result<(), Error> {
349 if self.enabled_services.is_empty() {
350 return Ok(());
351 }
352 let tracer_provider = global::tracer_provider();
353 let tracer = tracer_provider.versioned_tracer(
354 "fluentci-core",
355 Some(env!("CARGO_PKG_VERSION")),
356 Some("https://opentelemetry.io/schemas/1.17.0"),
357 None,
358 );
359
360 let mut span = tracer.start_with_context("start services", &ctx);
361
362 let process_compose_config = process_compose::ConfigFile {
363 version: "0.5".to_string(),
364 processes: self
365 .enabled_services
366 .iter()
367 .map(|s| (s.name.clone(), s.clone().into()))
368 .collect(),
369 ..Default::default()
370 };
371 let yaml = serde_yaml::to_string(&process_compose_config)?;
372
373 span.set_attribute(KeyValue::new("process_compose", yaml.clone()));
374
375 let label = format!("[{}]", "start services");
376 println!("{}", label.cyan());
377 fluentci_logging::info(&label, "process-compose")?;
378 fluentci_logging::info(&yaml, "process-compose")?;
379
380 thread::spawn(move || {
381 let (tx, _rx) = mpsc::channel();
382 match ServiceExt::default().exec(&yaml, tx.clone(), Output::Stdout, true, ".") {
383 Ok(_) => {}
384 Err(e) => {
385 println!("{}", e);
386 match fluentci_logging::error(&e.to_string(), "process-compose") {
387 Ok(_) => {}
388 Err(e) => {
389 println!("{}", e);
390 }
391 }
392 }
393 }
394 });
395
396 span.end();
397
398 thread::sleep(std::time::Duration::from_secs(5));
399
400 Ok(())
401 }
402
403 pub fn execute_graph(&mut self, output: Output) {
404 let tracer_provider = global::tracer_provider();
405 let tracer = tracer_provider.versioned_tracer(
406 "fluentci-core",
407 Some(env!("CARGO_PKG_VERSION")),
408 Some("https://opentelemetry.io/schemas/1.17.0"),
409 None,
410 );
411
412 let skip = vec![
413 "git",
414 "git-checkout",
415 "git-last-commit",
416 "tree",
417 "http",
418 "cache",
419 "file",
420 "directory",
421 "chmod",
422 "withFile",
423 "asService",
424 "trust",
425 ];
426 let mut visited = vec![false; self.vertices.len()];
427 let mut stack = Vec::new();
428 for (i, vertex) in self.vertices.iter().enumerate() {
429 if vertex.needs.is_empty() {
430 stack.push(i);
431 }
432 }
433 let root_span = tracer.start("root");
434 let context = Context::current_with_span(root_span);
435
436 match self.execute_services(&context) {
437 Ok(_) => {}
438 Err(e) => {
439 println!("Error: {}", e);
440 self.tx.send(("Error".to_string(), 1)).unwrap();
441 return;
442 }
443 }
444
445 while let Some(i) = stack.pop() {
446 let label = &self.vertices[i].label.as_str();
447 if visited[i] {
448 continue;
449 }
450 visited[i] = true;
451 for edge in self.edges.iter().filter(|e| e.from == i) {
452 stack.push(edge.to);
453 }
454
455 if skip.contains(&label) {
456 continue;
457 }
458
459 let (tx, rx) = mpsc::channel();
460 let mut span = tracer.start_with_context(
461 format!("{} {}", label.to_string(), self.vertices[i].command.clone()),
462 &context,
463 );
464
465 span.set_attribute(KeyValue::new("command", self.vertices[i].command.clone()));
466
467 if self.vertices[i].label == "withWorkdir" {
468 if !Path::new(&self.vertices[i].command).exists() {
469 println!("Error: {}", self.vertices[i].id);
470 span.end();
471 self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
472 break;
473 }
474 self.work_dir = self.vertices[i].command.clone();
475 span.end();
476 continue;
477 }
478
479 if self.vertices[i].label == "useEnv" {
480 match Envhub::default().r#use(&self.vertices[i].command, &self.work_dir) {
481 Ok(_) => {
482 span.end();
483 }
484 Err(e) => {
485 println!("Error: {}", e);
486 span.end();
487 self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
488 break;
489 }
490 }
491 span.end();
492 continue;
493 }
494
495 match self.vertices[i].run(tx, output.clone(), stack.len() == 1, &self.work_dir) {
496 Ok(status) => {
497 if !status.success() {
498 println!("Error: {}", self.vertices[i].id);
499 span.end();
500 self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
501 break;
502 }
503 }
504 Err(e) => {
505 println!("Error: {}", e);
506 span.end();
507 self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
508 break;
509 }
510 };
511
512 if stack.len() == 1 {
513 if let Ok(command_output) = rx.recv() {
514 match self.tx.send((command_output, 0)) {
515 Ok(_) => {}
516 Err(e) => {
517 println!("Error: {}", e);
518 }
519 }
520 }
521 }
522 span.end();
523 }
524
525 self.post_execute(&context);
526 }
527
528 pub fn execute_vertex(&mut self, id: &str) -> Result<String, Error> {
529 let tracer_provider = global::tracer_provider();
530 let tracer = tracer_provider.versioned_tracer(
531 "fluentci-core",
532 Some(env!("CARGO_PKG_VERSION")),
533 Some("https://opentelemetry.io/schemas/1.17.0"),
534 None,
535 );
536
537 let mut result = String::from("");
538 let mut visited = vec![false; self.vertices.len()];
539 let mut stack = Vec::new();
540 let mut index = 0;
541 for (i, vertex) in self.vertices.iter().enumerate() {
542 if vertex.id == id {
543 index = i;
544 break;
545 }
546 }
547 stack.push(index);
548 while let Some(i) = stack.pop() {
549 if visited[i] {
550 continue;
551 }
552 visited[i] = true;
553 for edge in self.edges.iter().filter(|e| e.from == i) {
554 stack.push(edge.to);
555 }
556 let (tx, rx) = mpsc::channel();
557 let label = self.vertices[i].label.clone();
558 let command = self.vertices[i].command.clone();
559 let mut span = tracer.start(format!("{} {}", label, command.clone()));
560 span.set_attribute(KeyValue::new("command", command.clone()));
561
562 if self.vertices[i].label == "withWorkdir" {
563 if !Path::new(&self.vertices[i].command).exists() {
564 span.end();
565 return Err(Error::msg(format!("Error: {}", self.vertices[i].id)));
566 }
567 self.work_dir = self.vertices[i].command.clone();
568 span.end();
569 continue;
570 }
571
572 match self.vertices[i].run(tx, Output::Stdout, true, &self.work_dir) {
573 Ok(status) => {
574 if !status.success() {
575 span.end();
576 return Err(Error::msg(format!("Error: {}", self.vertices[i].id)));
577 }
578 result = rx.recv()?;
579 span.end();
580 }
581 Err(e) => {
582 span.end();
583 return Err(Error::msg(format!("Error: {}", e)));
584 }
585 };
586 }
587
588 Ok(result)
589 }
590
591 pub fn post_execute(&mut self, ctx: &Context) {
592 if !self.enabled_services.is_empty() {
593 let (tx, _) = mpsc::channel();
594 match ServiceExt::default().post_setup(tx) {
595 Ok(_) => {}
596 Err(e) => {
597 println!("Failed to stop services: {}", e);
598 }
599 }
600 }
601
602 self.enabled_services.clear();
603 let tracer_provider = global::tracer_provider();
604 let tracer = tracer_provider.versioned_tracer(
605 "fluentci-core",
606 Some(env!("CARGO_PKG_VERSION")),
607 Some("https://opentelemetry.io/schemas/1.17.0"),
608 None,
609 );
610
611 let only = vec!["withCache", "withService"];
612
613 let mut visited = vec![false; self.vertices.len()];
614 let mut stack = Vec::new();
615 for (i, vertex) in self.vertices.iter().enumerate() {
616 if vertex.needs.is_empty() {
617 stack.push(i);
618 }
619 }
620
621 while let Some(i) = stack.pop() {
622 let label = &self.vertices[i].label.as_str();
623 if visited[i] {
624 continue;
625 }
626 visited[i] = true;
627 for edge in self.edges.iter().filter(|e| e.from == i) {
628 stack.push(edge.to);
629 }
630
631 if !only.contains(&label) {
632 continue;
633 }
634
635 let mut span = tracer.start_with_context(
636 format!("{} {}", label.to_string(), self.vertices[i].command.clone()),
637 &ctx,
638 );
639 span.set_attribute(KeyValue::new("command", self.vertices[i].command.clone()));
640 let (tx, rx) = mpsc::channel();
641 match self.vertices[i].runner.post_setup(tx) {
642 Ok(_) => {
643 span.end();
644 }
645 Err(e) => {
646 println!("Error: {}", e);
647 match fluentci_logging::error(&e.to_string(), "post-setup") {
648 Ok(_) => {}
649 Err(e) => {
650 println!("{}", e);
651 }
652 }
653 span.end();
654 break;
655 }
656 }
657 rx.recv().unwrap();
658 }
659 }
660
661 pub fn size(&self) -> usize {
662 self.vertices.len()
663 }
664
665 pub fn reset(&mut self) {
666 self.vertices.clear();
667 self.edges.clear();
668 self.work_dir = current_dir().unwrap().to_str().unwrap().to_string();
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use std::sync::mpsc;
675
676 use fluentci_ext::runner::Runner;
677
678 use super::*;
679
680 #[test]
681 fn test_graph() -> Result<(), Error> {
682 let (tx, _) = mpsc::channel();
683 let mut graph = Graph::new(tx, Arc::new(Box::new(Runner::default())));
684 graph.execute(GraphCommand::AddVertex(
685 "1".into(),
686 "A".into(),
687 "echo A".into(),
688 vec![],
689 Arc::new(Box::new(Runner::default())),
690 ))?;
691 graph.execute(GraphCommand::AddVertex(
692 "2".into(),
693 "B".into(),
694 "echo B".into(),
695 vec!["1".into()],
696 Arc::new(Box::new(Runner::default())),
697 ))?;
698 graph.execute(GraphCommand::AddVertex(
699 "3".into(),
700 "C".into(),
701 "echo C".into(),
702 vec!["1".into()],
703 Arc::new(Box::new(Runner::default())),
704 ))?;
705 graph.execute(GraphCommand::AddVertex(
706 "4".into(),
707 "D".into(),
708 "echo D".into(),
709 vec!["2".into(), "3".into()],
710 Arc::new(Box::new(Runner::default())),
711 ))?;
712 graph.execute(GraphCommand::AddEdge(0, 1))?;
713 graph.execute(GraphCommand::AddEdge(0, 2))?;
714 graph.execute(GraphCommand::AddEdge(1, 3))?;
715 graph.execute(GraphCommand::AddEdge(2, 3))?;
716
717 assert_eq!(graph.size(), 4);
718 assert_eq!(graph.vertices[0].id, "1");
719 assert_eq!(graph.vertices[1].id, "2");
720 assert_eq!(graph.vertices[2].id, "3");
721 assert_eq!(graph.vertices[3].id, "4");
722 assert_eq!(graph.vertices[0].label, "A");
723 assert_eq!(graph.vertices[1].label, "B");
724 assert_eq!(graph.vertices[2].label, "C");
725 assert_eq!(graph.vertices[3].label, "D");
726 assert_eq!(graph.vertices[0].command, "echo A");
727 assert_eq!(graph.vertices[1].command, "echo B");
728 assert_eq!(graph.vertices[2].command, "echo C");
729 assert_eq!(graph.vertices[3].command, "echo D");
730
731 graph.execute(GraphCommand::Execute(Output::Stdout))?;
732
733 graph.reset();
734
735 assert_eq!(graph.size(), 0);
736 Ok(())
737 }
738}