fluentci_core/
deps.rs

1use anyhow::Error;
2use fluentci_secrets::{Provider, Vault, VaultConfig};
3use opentelemetry::{
4    global,
5    trace::{Span, TraceContextExt, Tracer, TracerProvider},
6    Context, KeyValue,
7};
8use owo_colors::OwoColorize;
9use std::sync::mpsc::{self, Sender};
10use std::sync::Arc;
11use std::{
12    collections::HashMap,
13    env::{self, current_dir},
14};
15use std::{path::Path, thread};
16use uuid::Uuid;
17
18use fluentci_ext::envhub::Envhub;
19use fluentci_ext::service::Service as ServiceExt;
20use fluentci_ext::Extension;
21use fluentci_types::{
22    nix::NixArgs,
23    process_compose::{self, Process},
24    secret::Secret,
25    Output,
26};
27
28use crate::get_hmac;
29
30use super::edge::Edge;
31use super::vertex::{Runnable, Vertex};
32
33#[derive(Default, Debug, Clone)]
34pub struct Volume {
35    pub id: String,
36    pub label: String,
37    pub path: String,
38    pub key: String,
39}
40
41#[derive(Default, Clone)]
42pub struct Service {
43    pub id: String,
44    pub name: String,
45    pub vertices: Vec<Vertex>,
46    pub working_dir: String,
47}
48
49impl Into<Process> for Service {
50    fn into(self) -> Process {
51        Process {
52            command: self
53                .vertices
54                .iter()
55                .map(|v| v.runner.format_command(&v.command))
56                .collect::<Vec<String>>()
57                .join(" ; "),
58            depends_on: None,
59            working_dir: Some(self.working_dir),
60            ..Default::default()
61        }
62    }
63}
64
65pub enum GraphCommand {
66    AddVolume(String, String, String),
67    AddVertex(
68        String,
69        String,
70        String,
71        Vec<String>,
72        Arc<Box<dyn Extension + Send + Sync>>,
73    ),
74    AddEnvVariable(String, String),
75    AddSecretVariable(String, String, String),
76    EnableService(String),
77    AddEdge(usize, usize),
78    Execute(Output),
79    AddSecretManager(String, Provider),
80}
81
82#[derive(Clone)]
83pub struct Graph {
84    pub vertices: Vec<Vertex>,
85    pub edges: Vec<Edge>,
86    pub volumes: Vec<Volume>,
87    pub services: Vec<Service>,
88    pub enabled_services: Vec<Service>,
89    pub vaults: HashMap<String, Arc<Box<dyn Vault + Send + Sync>>>,
90    pub secrets: HashMap<String, String>,
91    pub secret_names: HashMap<String, String>,
92
93    tx: Sender<(String, usize)>,
94    pub runner: Arc<Box<dyn Extension + Send + Sync>>,
95    pub work_dir: String,
96    pub nix_args: NixArgs,
97}
98
99impl Graph {
100    pub fn new(tx: Sender<(String, usize)>, runner: Arc<Box<dyn Extension + Send + Sync>>) -> Self {
101        let work_dir = current_dir().unwrap().to_str().unwrap().to_string();
102        Graph {
103            vertices: Vec::new(),
104            volumes: Vec::new(),
105            services: Vec::new(),
106            enabled_services: Vec::new(),
107            edges: Vec::new(),
108            tx,
109            runner,
110            work_dir,
111            nix_args: NixArgs::default(),
112            vaults: HashMap::new(),
113            secrets: HashMap::new(),
114            secret_names: HashMap::new(),
115        }
116    }
117
118    pub fn set_secret(&mut self, name: String, value: String) -> Result<String, Error> {
119        let id = Uuid::new_v4().to_string();
120        let key = get_hmac(id.clone(), name.clone())?;
121        self.secrets.insert(key, value);
122        self.secret_names.insert(id.clone(), name);
123        Ok(id)
124    }
125
126    pub fn get_secret_plaintext(&self, secret_id: String, name: String) -> Result<String, Error> {
127        let key = get_hmac(secret_id, name)?;
128        match self.secrets.get(&key) {
129            Some(value) => Ok(value.clone()),
130            None => Err(Error::msg("Secret not found")),
131        }
132    }
133
134    pub fn get_secret(
135        &mut self,
136        secret_manager_id: &str,
137        name: String,
138    ) -> Result<Vec<Secret>, Error> {
139        match self.vaults.get(secret_manager_id) {
140            Some(provider) => {
141                let provider = provider.clone();
142                let cloned_name = name.clone();
143                let handler = thread::spawn(move || {
144                    let rt = tokio::runtime::Runtime::new().unwrap();
145                    rt.block_on(provider.download_json(&name))
146                });
147                let secrets = match handler.join() {
148                    Ok(Ok(secrets)) => secrets,
149                    Ok(Err(e)) => return Err(Error::msg(e)),
150                    Err(_) => return Err(Error::msg("Failed to join thread")),
151                };
152
153                if secrets.is_empty() {
154                    return Err(Error::msg(format!("Secret {} not found", cloned_name)));
155                }
156
157                let mut results = Vec::new();
158                secrets.iter().for_each(|(key, value)| {
159                    let id = Uuid::new_v4().to_string();
160                    let hmac = get_hmac(id.clone(), key.clone()).unwrap();
161                    self.secrets.insert(hmac, value.clone());
162                    self.secret_names.insert(id.clone(), key.clone());
163                    results.push(Secret {
164                        id: id.clone(),
165                        name: key.clone(),
166                        mount: cloned_name.clone(),
167                    });
168                });
169
170                Ok(results)
171            }
172            None => Err(Error::msg(format!(
173                "Secret manager {} not found",
174                secret_manager_id
175            ))),
176        }
177    }
178
179    pub fn register_service(&mut self, id: &str) {
180        // return if vertex at id is not found or is not a service
181        if self
182            .vertices
183            .iter()
184            .find(|v| v.id == id && v.label == "asService")
185            .is_none()
186        {
187            return;
188        }
189
190        let vertex = self
191            .vertices
192            .iter()
193            .find(|v| v.id == id && v.label == "asService")
194            .unwrap();
195
196        // browse the graph dependencies of the service
197        let mut visited = vec![false; self.vertices.len()];
198        let mut stack = Vec::new();
199        let mut service = Service {
200            id: id.to_string(),
201            name: vertex.command.clone(),
202            vertices: Vec::new(),
203            working_dir: self.work_dir.clone(),
204        };
205
206        let skip = vec![
207            "git",
208            "git-checkout",
209            "git-last-commit",
210            "tree",
211            "http",
212            "cache",
213            "file",
214            "directory",
215            "chmod",
216            "withFile",
217            "asService",
218            "trust",
219        ];
220
221        for (i, vertex) in self.vertices.iter().enumerate() {
222            if vertex.id == id {
223                stack.push(i);
224                break;
225            }
226        }
227
228        while let Some(i) = stack.pop() {
229            if visited[i] {
230                continue;
231            }
232            visited[i] = true;
233
234            for edge in self.edges.iter().filter(|e| e.to == i) {
235                stack.push(edge.from);
236            }
237
238            if skip.contains(&self.vertices[i].label.as_str()) {
239                continue;
240            }
241
242            if self.vertices[i].label == "withWorkdir" {
243                if !Path::new(&self.vertices[i].command).exists() {
244                    println!("Error: {}", self.vertices[i].id);
245                    match fluentci_logging::error(
246                        &format!("Error: {}", self.vertices[i].id),
247                        "register-service",
248                    ) {
249                        Ok(_) => {}
250                        Err(e) => {
251                            println!("{}", e);
252                        }
253                    }
254                    self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
255                    break;
256                }
257                service.working_dir = self.vertices[i].command.clone();
258                continue;
259            }
260
261            if self.vertices[i].label == "useEnv" {
262                match Envhub::default().r#use(&self.vertices[i].command, &self.work_dir) {
263                    Ok(_) => {}
264                    Err(e) => {
265                        println!("Error: {}", e);
266                        self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
267                        break;
268                    }
269                }
270                continue;
271            }
272
273            if self.vertices[i].command.is_empty() {
274                continue;
275            }
276
277            let vertex = self.vertices[i].clone();
278            service.vertices.insert(0, vertex);
279        }
280
281        self.services.push(service);
282    }
283
284    pub fn execute(&mut self, command: GraphCommand) -> Result<(), Error> {
285        match command {
286            GraphCommand::AddVolume(id, label, path) => {
287                self.volumes.push(Volume {
288                    id,
289                    label,
290                    key: path.clone(),
291                    path,
292                });
293            }
294            GraphCommand::AddVertex(id, label, command, needs, runner) => {
295                match self.vertices.iter_mut().find(|v| v.id == id) {
296                    Some(vertex) => vertex.needs.extend(needs),
297                    None => self.vertices.push(Vertex {
298                        id,
299                        label,
300                        command,
301                        needs,
302                        runner,
303                    }),
304                }
305            }
306            GraphCommand::AddEdge(from, to) => {
307                self.edges.push(Edge { from, to });
308            }
309            GraphCommand::AddEnvVariable(key, value) => {
310                env::set_var(key, value);
311            }
312            GraphCommand::AddSecretVariable(env_name, secret_id, secret_name) => {
313                let value = self.get_secret_plaintext(secret_id, secret_name)?;
314                env::set_var(env_name, value);
315            }
316            GraphCommand::Execute(Output::Stdout) => {
317                self.execute_graph(Output::Stdout);
318            }
319            GraphCommand::Execute(Output::Stderr) => {
320                self.execute_graph(Output::Stderr);
321            }
322            GraphCommand::EnableService(id) => match self.services.iter().find(|s| s.id == id) {
323                Some(service) => self.enabled_services.push(service.clone()),
324                None => return Err(Error::msg("Service not found")),
325            },
326            GraphCommand::AddSecretManager(id, provider) => match provider {
327                Provider::Aws(config) => {
328                    self.vaults
329                        .insert(id, Arc::new(Box::new(config.into_vault()?)));
330                }
331                Provider::Google(config) => {
332                    self.vaults
333                        .insert(id, Arc::new(Box::new(config.into_vault()?)));
334                }
335                Provider::Hashicorp(config) => {
336                    self.vaults
337                        .insert(id, Arc::new(Box::new(config.into_vault()?)));
338                }
339                Provider::Azure(config) => {
340                    self.vaults
341                        .insert(id, Arc::new(Box::new(config.into_vault()?)));
342                }
343            },
344        }
345        Ok(())
346    }
347
348    pub fn execute_services(&mut self, ctx: &Context) -> Result<(), Error> {
349        if self.enabled_services.is_empty() {
350            return Ok(());
351        }
352        let tracer_provider = global::tracer_provider();
353        let tracer = tracer_provider.versioned_tracer(
354            "fluentci-core",
355            Some(env!("CARGO_PKG_VERSION")),
356            Some("https://opentelemetry.io/schemas/1.17.0"),
357            None,
358        );
359
360        let mut span = tracer.start_with_context("start services", &ctx);
361
362        let process_compose_config = process_compose::ConfigFile {
363            version: "0.5".to_string(),
364            processes: self
365                .enabled_services
366                .iter()
367                .map(|s| (s.name.clone(), s.clone().into()))
368                .collect(),
369            ..Default::default()
370        };
371        let yaml = serde_yaml::to_string(&process_compose_config)?;
372
373        span.set_attribute(KeyValue::new("process_compose", yaml.clone()));
374
375        let label = format!("[{}]", "start services");
376        println!("{}", label.cyan());
377        fluentci_logging::info(&label, "process-compose")?;
378        fluentci_logging::info(&yaml, "process-compose")?;
379
380        thread::spawn(move || {
381            let (tx, _rx) = mpsc::channel();
382            match ServiceExt::default().exec(&yaml, tx.clone(), Output::Stdout, true, ".") {
383                Ok(_) => {}
384                Err(e) => {
385                    println!("{}", e);
386                    match fluentci_logging::error(&e.to_string(), "process-compose") {
387                        Ok(_) => {}
388                        Err(e) => {
389                            println!("{}", e);
390                        }
391                    }
392                }
393            }
394        });
395
396        span.end();
397
398        thread::sleep(std::time::Duration::from_secs(5));
399
400        Ok(())
401    }
402
403    pub fn execute_graph(&mut self, output: Output) {
404        let tracer_provider = global::tracer_provider();
405        let tracer = tracer_provider.versioned_tracer(
406            "fluentci-core",
407            Some(env!("CARGO_PKG_VERSION")),
408            Some("https://opentelemetry.io/schemas/1.17.0"),
409            None,
410        );
411
412        let skip = vec![
413            "git",
414            "git-checkout",
415            "git-last-commit",
416            "tree",
417            "http",
418            "cache",
419            "file",
420            "directory",
421            "chmod",
422            "withFile",
423            "asService",
424            "trust",
425        ];
426        let mut visited = vec![false; self.vertices.len()];
427        let mut stack = Vec::new();
428        for (i, vertex) in self.vertices.iter().enumerate() {
429            if vertex.needs.is_empty() {
430                stack.push(i);
431            }
432        }
433        let root_span = tracer.start("root");
434        let context = Context::current_with_span(root_span);
435
436        match self.execute_services(&context) {
437            Ok(_) => {}
438            Err(e) => {
439                println!("Error: {}", e);
440                self.tx.send(("Error".to_string(), 1)).unwrap();
441                return;
442            }
443        }
444
445        while let Some(i) = stack.pop() {
446            let label = &self.vertices[i].label.as_str();
447            if visited[i] {
448                continue;
449            }
450            visited[i] = true;
451            for edge in self.edges.iter().filter(|e| e.from == i) {
452                stack.push(edge.to);
453            }
454
455            if skip.contains(&label) {
456                continue;
457            }
458
459            let (tx, rx) = mpsc::channel();
460            let mut span = tracer.start_with_context(
461                format!("{} {}", label.to_string(), self.vertices[i].command.clone()),
462                &context,
463            );
464
465            span.set_attribute(KeyValue::new("command", self.vertices[i].command.clone()));
466
467            if self.vertices[i].label == "withWorkdir" {
468                if !Path::new(&self.vertices[i].command).exists() {
469                    println!("Error: {}", self.vertices[i].id);
470                    span.end();
471                    self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
472                    break;
473                }
474                self.work_dir = self.vertices[i].command.clone();
475                span.end();
476                continue;
477            }
478
479            if self.vertices[i].label == "useEnv" {
480                match Envhub::default().r#use(&self.vertices[i].command, &self.work_dir) {
481                    Ok(_) => {
482                        span.end();
483                    }
484                    Err(e) => {
485                        println!("Error: {}", e);
486                        span.end();
487                        self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
488                        break;
489                    }
490                }
491                span.end();
492                continue;
493            }
494
495            match self.vertices[i].run(tx, output.clone(), stack.len() == 1, &self.work_dir) {
496                Ok(status) => {
497                    if !status.success() {
498                        println!("Error: {}", self.vertices[i].id);
499                        span.end();
500                        self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
501                        break;
502                    }
503                }
504                Err(e) => {
505                    println!("Error: {}", e);
506                    span.end();
507                    self.tx.send((self.vertices[i].command.clone(), 1)).unwrap();
508                    break;
509                }
510            };
511
512            if stack.len() == 1 {
513                if let Ok(command_output) = rx.recv() {
514                    match self.tx.send((command_output, 0)) {
515                        Ok(_) => {}
516                        Err(e) => {
517                            println!("Error: {}", e);
518                        }
519                    }
520                }
521            }
522            span.end();
523        }
524
525        self.post_execute(&context);
526    }
527
528    pub fn execute_vertex(&mut self, id: &str) -> Result<String, Error> {
529        let tracer_provider = global::tracer_provider();
530        let tracer = tracer_provider.versioned_tracer(
531            "fluentci-core",
532            Some(env!("CARGO_PKG_VERSION")),
533            Some("https://opentelemetry.io/schemas/1.17.0"),
534            None,
535        );
536
537        let mut result = String::from("");
538        let mut visited = vec![false; self.vertices.len()];
539        let mut stack = Vec::new();
540        let mut index = 0;
541        for (i, vertex) in self.vertices.iter().enumerate() {
542            if vertex.id == id {
543                index = i;
544                break;
545            }
546        }
547        stack.push(index);
548        while let Some(i) = stack.pop() {
549            if visited[i] {
550                continue;
551            }
552            visited[i] = true;
553            for edge in self.edges.iter().filter(|e| e.from == i) {
554                stack.push(edge.to);
555            }
556            let (tx, rx) = mpsc::channel();
557            let label = self.vertices[i].label.clone();
558            let command = self.vertices[i].command.clone();
559            let mut span = tracer.start(format!("{} {}", label, command.clone()));
560            span.set_attribute(KeyValue::new("command", command.clone()));
561
562            if self.vertices[i].label == "withWorkdir" {
563                if !Path::new(&self.vertices[i].command).exists() {
564                    span.end();
565                    return Err(Error::msg(format!("Error: {}", self.vertices[i].id)));
566                }
567                self.work_dir = self.vertices[i].command.clone();
568                span.end();
569                continue;
570            }
571
572            match self.vertices[i].run(tx, Output::Stdout, true, &self.work_dir) {
573                Ok(status) => {
574                    if !status.success() {
575                        span.end();
576                        return Err(Error::msg(format!("Error: {}", self.vertices[i].id)));
577                    }
578                    result = rx.recv()?;
579                    span.end();
580                }
581                Err(e) => {
582                    span.end();
583                    return Err(Error::msg(format!("Error: {}", e)));
584                }
585            };
586        }
587
588        Ok(result)
589    }
590
591    pub fn post_execute(&mut self, ctx: &Context) {
592        if !self.enabled_services.is_empty() {
593            let (tx, _) = mpsc::channel();
594            match ServiceExt::default().post_setup(tx) {
595                Ok(_) => {}
596                Err(e) => {
597                    println!("Failed to stop services: {}", e);
598                }
599            }
600        }
601
602        self.enabled_services.clear();
603        let tracer_provider = global::tracer_provider();
604        let tracer = tracer_provider.versioned_tracer(
605            "fluentci-core",
606            Some(env!("CARGO_PKG_VERSION")),
607            Some("https://opentelemetry.io/schemas/1.17.0"),
608            None,
609        );
610
611        let only = vec!["withCache", "withService"];
612
613        let mut visited = vec![false; self.vertices.len()];
614        let mut stack = Vec::new();
615        for (i, vertex) in self.vertices.iter().enumerate() {
616            if vertex.needs.is_empty() {
617                stack.push(i);
618            }
619        }
620
621        while let Some(i) = stack.pop() {
622            let label = &self.vertices[i].label.as_str();
623            if visited[i] {
624                continue;
625            }
626            visited[i] = true;
627            for edge in self.edges.iter().filter(|e| e.from == i) {
628                stack.push(edge.to);
629            }
630
631            if !only.contains(&label) {
632                continue;
633            }
634
635            let mut span = tracer.start_with_context(
636                format!("{} {}", label.to_string(), self.vertices[i].command.clone()),
637                &ctx,
638            );
639            span.set_attribute(KeyValue::new("command", self.vertices[i].command.clone()));
640            let (tx, rx) = mpsc::channel();
641            match self.vertices[i].runner.post_setup(tx) {
642                Ok(_) => {
643                    span.end();
644                }
645                Err(e) => {
646                    println!("Error: {}", e);
647                    match fluentci_logging::error(&e.to_string(), "post-setup") {
648                        Ok(_) => {}
649                        Err(e) => {
650                            println!("{}", e);
651                        }
652                    }
653                    span.end();
654                    break;
655                }
656            }
657            rx.recv().unwrap();
658        }
659    }
660
661    pub fn size(&self) -> usize {
662        self.vertices.len()
663    }
664
665    pub fn reset(&mut self) {
666        self.vertices.clear();
667        self.edges.clear();
668        self.work_dir = current_dir().unwrap().to_str().unwrap().to_string();
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use std::sync::mpsc;
675
676    use fluentci_ext::runner::Runner;
677
678    use super::*;
679
680    #[test]
681    fn test_graph() -> Result<(), Error> {
682        let (tx, _) = mpsc::channel();
683        let mut graph = Graph::new(tx, Arc::new(Box::new(Runner::default())));
684        graph.execute(GraphCommand::AddVertex(
685            "1".into(),
686            "A".into(),
687            "echo A".into(),
688            vec![],
689            Arc::new(Box::new(Runner::default())),
690        ))?;
691        graph.execute(GraphCommand::AddVertex(
692            "2".into(),
693            "B".into(),
694            "echo B".into(),
695            vec!["1".into()],
696            Arc::new(Box::new(Runner::default())),
697        ))?;
698        graph.execute(GraphCommand::AddVertex(
699            "3".into(),
700            "C".into(),
701            "echo C".into(),
702            vec!["1".into()],
703            Arc::new(Box::new(Runner::default())),
704        ))?;
705        graph.execute(GraphCommand::AddVertex(
706            "4".into(),
707            "D".into(),
708            "echo D".into(),
709            vec!["2".into(), "3".into()],
710            Arc::new(Box::new(Runner::default())),
711        ))?;
712        graph.execute(GraphCommand::AddEdge(0, 1))?;
713        graph.execute(GraphCommand::AddEdge(0, 2))?;
714        graph.execute(GraphCommand::AddEdge(1, 3))?;
715        graph.execute(GraphCommand::AddEdge(2, 3))?;
716
717        assert_eq!(graph.size(), 4);
718        assert_eq!(graph.vertices[0].id, "1");
719        assert_eq!(graph.vertices[1].id, "2");
720        assert_eq!(graph.vertices[2].id, "3");
721        assert_eq!(graph.vertices[3].id, "4");
722        assert_eq!(graph.vertices[0].label, "A");
723        assert_eq!(graph.vertices[1].label, "B");
724        assert_eq!(graph.vertices[2].label, "C");
725        assert_eq!(graph.vertices[3].label, "D");
726        assert_eq!(graph.vertices[0].command, "echo A");
727        assert_eq!(graph.vertices[1].command, "echo B");
728        assert_eq!(graph.vertices[2].command, "echo C");
729        assert_eq!(graph.vertices[3].command, "echo D");
730
731        graph.execute(GraphCommand::Execute(Output::Stdout))?;
732
733        graph.reset();
734
735        assert_eq!(graph.size(), 0);
736        Ok(())
737    }
738}