flowrclib/compiler/
compile.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3
4use log::{debug, info};
5use serde_derive::Serialize;
6use url::Url;
7
8use flowcore::model::connection::Connection;
9use flowcore::model::flow_definition::FlowDefinition;
10use flowcore::model::function_definition::FunctionDefinition;
11use flowcore::model::name::HasName;
12use flowcore::model::output_connection::{OutputConnection, Source};
13use flowcore::model::output_connection::Source::{Input, Output};
14use flowcore::model::route::{HasRoute, Route};
15
16use crate::compiler::compile_wasm;
17use crate::errors::*;
18
19use super::checker;
20use super::gatherer;
21use super::optimizer;
22
23/// [CompilerTables] are built from the flattened and connected flow model in memory and are
24/// used to generate the flow's manifest ready to be executed.
25#[derive(Serialize, Default)]
26pub struct CompilerTables {
27    /// The set of connections between functions in the compiled flow
28    pub connections: Vec<Connection>,
29    /// Map of sources of values and what route they are connected to
30    pub sources: BTreeMap<Route, (Source, usize)>,
31    /// Map from "route of the output of a function" --> (output name, source_function_id)
32    pub destination_routes: BTreeMap<Route, (usize, usize, usize)>,
33    /// HashMap from "route of the input of a function" --> (destination_function_id, input number, flow_id)
34    pub collapsed_connections: Vec<Connection>,
35    /// The set of functions left in a flow after it has been flattened, connected and optimized
36    pub functions: Vec<FunctionDefinition>,
37    /// The set of libraries used by a flow, from their Urls
38    pub libs: BTreeSet<Url>,
39    /// The set of context functions used by a flow, from their Urls
40    pub context_functions: BTreeSet<Url>,
41    /// The list of source files that were used in the flow definition
42    pub source_files: Vec<String>,
43}
44
45impl CompilerTables {
46    /// Create a new set of `CompilerTables` for use in compiling a flow
47    pub fn new() -> Self {
48        CompilerTables {
49            connections: Vec::new(),
50            sources: BTreeMap::<Route, (Source, usize)>::new(),
51            destination_routes: BTreeMap::<Route, (usize, usize, usize)>::new(),
52            collapsed_connections: Vec::new(),
53            functions: Vec::new(),
54            libs: BTreeSet::new(),
55            context_functions: BTreeSet::new(),
56            source_files: Vec::new(),
57        }
58    }
59
60    // TODO limit lifetime to table lifetime and return a reference
61    // TODO make collapsed_connections a Map and just try and get by to_io.route()
62    /// Return an optional connection found to a destination input
63    pub fn connection_to(&self, input: &Route) -> Option<Connection> {
64        for connection in &self.collapsed_connections {
65            if connection.to_io().route() == input {
66                return Some(connection.clone());
67            }
68        }
69        None
70    }
71
72    /// consistently order the functions so each compile produces the same numbering
73    pub fn sort_functions(&mut self) {
74        self.functions.sort_by_key(|f| f.get_id());
75    }
76
77
78    /// Construct two look-up tables that can be used to find the index of a function in the functions table,
79    /// and the index of it's input - using the input route or it's output route
80    pub fn create_routes_table(&mut self) {
81        for function in &mut self.functions {
82            // Add inputs to functions to the table as a possible source of connections from a
83            // job that completed using this function
84            for (input_number, input) in function.get_inputs().iter().enumerate() {
85                self.sources.insert(
86                    input.route().clone(),
87                    (Input(input_number), function.get_id()),
88                );
89            }
90
91            // Add any output routes it has to the source routes table
92            for output in function.get_outputs() {
93                self.sources.insert(
94                    output.route().clone(),
95                    (Output(output.name().to_string()), function.get_id()),
96                );
97            }
98
99            // Add any inputs it has to the destination routes table
100            for (input_index, input) in function.get_inputs().iter().enumerate() {
101                self.destination_routes.insert(
102                    input.route().clone(),
103                    (function.get_id(), input_index, function.get_flow_id()),
104                );
105            }
106        }
107    }
108}
109
110/// Take a hierarchical flow definition in memory and compile it, generating a manifest for execution
111/// of the flow, including references to libraries required.
112pub fn compile(flow: &FlowDefinition,
113               output_dir: &Path,
114               skip_building: bool,
115               optimize: bool,
116               #[cfg(feature = "debugger")]
117                source_urls: &mut BTreeMap<String, Url>
118    ) -> Result<CompilerTables> {
119    let mut tables = CompilerTables::new();
120
121    gatherer::gather_functions_and_connections(flow, &mut tables)?;
122    gatherer::collapse_connections(&mut tables)?;
123    if optimize {
124        optimizer::optimize(&mut tables);
125    }
126    checker::check_function_inputs(&tables)?;
127    checker::check_side_effects(&tables)?;
128    configure_output_connections(&mut tables)?;
129    compile_supplied_implementations(
130        output_dir,
131        &mut tables,
132        skip_building,
133        optimize,
134        source_urls,
135    ).chain_err(|| "Could not compile to wasm the flow's supplied implementation(s)")?;
136
137    Ok(tables)
138}
139
140/// Calculate the paths to the source file of the implementation of the function to be compiled
141/// and where to output the compiled wasm.
142/// out_dir optionally overrides the destination directory where the wasm should end up
143pub fn get_paths(wasm_output_dir: &Path, function: &FunctionDefinition) -> Result<(PathBuf, PathBuf)> {
144    let source_url = function.get_source_url().join(function.get_source())?;
145
146    let source_path = source_url
147        .to_file_path()
148        .map_err(|_| "Could not convert source url to file path")?;
149
150    let mut wasm_path = wasm_output_dir.join(function.get_source());
151    wasm_path.set_extension("wasm");
152
153    Ok((source_path, wasm_path))
154}
155
156// For any function that provides an implementation - compile the source to wasm and modify the
157// implementation to indicate it is the wasm file
158fn compile_supplied_implementations(
159    out_dir: &Path,
160    tables: &mut CompilerTables,
161    skip_building: bool,
162    release_build: bool,
163    #[cfg(feature = "debugger")]
164    source_urls: &mut BTreeMap<String, Url>
165) -> Result<String> {
166    for function in &mut tables.functions {
167        if function.get_lib_reference().is_none() && function.get_context_reference().is_none() {
168            let (implementation_source_path, wasm_destination) = get_paths(out_dir, function)?;
169            let mut cargo_target_dir = implementation_source_path.parent()
170                .ok_or("Could not get directory where Cargo.toml resides")?.to_path_buf();
171            if release_build {
172                cargo_target_dir.push("target/wasm32-unknown-unknown/release/");
173            } else {
174                cargo_target_dir.push("target/wasm32-unknown-unknown/debug/");
175            }
176
177            compile_wasm::compile_implementation(
178                out_dir,
179                cargo_target_dir,
180                &wasm_destination,
181                &implementation_source_path,
182                function,
183                skip_building,
184                release_build,
185                #[cfg(feature = "debugger")]
186                source_urls,
187            )?;
188        }
189    }
190
191    Ok("All supplied implementations compiled successfully".into())
192}
193
194// Go through all connections, finding:
195// - source function (function id and the output route the connection is from)
196// - destination function (function id and input number the connection is to)
197//
198// Then add an output route to the source function's list of output routes
199// (according to each function's output route in the original description plus each connection from
200// that route, which could be to multiple destinations)
201fn configure_output_connections(tables: &mut CompilerTables) -> Result<()> {
202    info!("\n=== Compiler: Configuring Output Connections");
203    for connection in &tables.collapsed_connections {
204        let (source, source_id) = get_source(&tables.sources,
205                                             connection.from_io().route())
206            .ok_or(format!("Connection source for route '{}' was not found",
207                           connection.from_io().route()))?;
208
209        let (destination_function_id, destination_input_index, destination_flow_id) =
210            tables.destination_routes.get(connection.to_io().route())
211                .ok_or(format!("Connection destination for route '{}' was not found",
212                               connection.to_io().route()))?;
213
214        let source_function = tables.functions.get_mut(source_id)
215            .ok_or(format!("Could not find function with id: {source_id} \
216            while configuring output connection '{connection}'"))?;
217
218        debug!(
219            "Connection: from '{}' to '{}'",
220            &connection.from_io().route(),
221            &connection.to_io().route()
222        );
223        debug!("  Source output route = '{}' --> function #{}:{}",
224               source, destination_function_id, destination_input_index);
225
226        let output_conn = OutputConnection::new(
227            source,
228            *destination_function_id,
229            *destination_input_index,
230            *destination_flow_id,
231            connection.to_io().route().to_string(),
232            #[cfg(feature = "debugger")]
233                connection.name().to_string(),
234        );
235        source_function.add_output_connection(output_conn);
236    }
237
238    info!("Output Connections set on all functions");
239
240    Ok(())
241}
242
243/*
244    Find a Function's IO using a route to it or subroute of it
245    Return an Option:
246        None --> The IO was not found
247        Some (subroute, function_index) with:
248        - The subroute of the IO relative to the function it belongs to
249        - The function's index in the compiler's tables
250    -  (removing the array index first to find outputs that are arrays, but then adding it back into the subroute) TODO change
251*/
252fn get_source(
253    source_routes: &BTreeMap<Route, (Source, usize)>,
254    from_route: &Route,
255) -> Option<(Source, usize)> {
256    let mut source_route = from_route.clone();
257    let mut sub_route = Route::from("");
258
259    // Look for a function/output or function/input with a route that matches what we are looking for
260    // popping off sub-structure sub-path segments until none left
261    loop {
262        match source_routes.get(&source_route) {
263            Some((Output(io_sub_route), function_index)) => {
264                return if io_sub_route.is_empty() {
265                    Some((Output(format!("{sub_route}")), *function_index))
266                } else {
267                    Some((
268                        Output(format!("/{io_sub_route}{sub_route}")),
269                        *function_index,
270                    ))
271                }
272            }
273            Some((Input(io_index), function_index)) => {
274                return Some((Input(*io_index), *function_index));
275            }
276            _ => {}
277        }
278
279        // pop a route segment off the source route - if there are any left
280        match source_route.pop() {
281            (_, None) => break,
282            (parent, Some(sub)) => {
283                source_route = parent.into_owned();
284                // insert new route segment at the start of the sub_route we are building up
285                sub_route.insert(sub);
286                sub_route.insert("/");
287            }
288        }
289    }
290
291    None
292}
293
294#[cfg(test)]
295mod test {
296    #[cfg(feature = "debugger")]
297    use std::collections::BTreeMap;
298    use std::path::Path;
299
300    use tempdir::TempDir;
301    use url::Url;
302
303    use flowcore::model::datatype::STRING_TYPE;
304    use flowcore::model::flow_definition::FlowDefinition;
305    use flowcore::model::function_definition::FunctionDefinition;
306    use flowcore::model::io::IO;
307    use flowcore::model::name::{HasName, Name};
308    use flowcore::model::output_connection::{OutputConnection, Source};
309    use flowcore::model::process_reference::ProcessReference;
310    use flowcore::model::route::Route;
311
312    use crate::compiler::compile::{compile, get_paths};
313
314    mod get_source_tests {
315        use std::collections::BTreeMap;
316
317        use flowcore::model::output_connection::Source;
318        use flowcore::model::output_connection::Source::Output;
319        use flowcore::model::route::Route;
320
321        use super::super::get_source;
322
323        /*
324                                                                                                    Create a HashTable of routes for use in tests.
325                                                                                                    Each entry (K, V) is:
326                                                                                                    - Key   - the route to a function's IO
327                                                                                                    - Value - a tuple of
328                                                                                                                - sub-route (or IO name) from the function to be used at runtime
329                                                                                                                - the id number of the function in the functions table, to select it at runtime
330
331                                                                                                    Plus a vector of test cases with the Route to search for and the expected function_id and output sub-route
332                                                                                                */
333        #[allow(clippy::type_complexity)]
334        fn test_source_routes() -> (
335            BTreeMap<Route, (Source, usize)>,
336            Vec<(&'static str, Route, Option<(Source, usize)>)>,
337        ) {
338            // make sure a corresponding entry (if applicable) is in the table to give the expected response
339            let mut test_sources = BTreeMap::<Route, (Source, usize)>::new();
340            test_sources.insert(Route::from("/root/f1"), (Source::default(), 0));
341            test_sources.insert(
342                Route::from("/root/f2/output_value"),
343                (Output("output_value".into()), 1),
344            );
345            test_sources.insert(
346                Route::from("/root/f2/output_value_2"),
347                (Output("output_value_2".into()), 2),
348            );
349
350            // Create a vector of test cases and expected responses
351            //                 Input:Test Route    Outputs: Subroute,       Function ID
352            let mut test_cases: Vec<(&str, Route, Option<(Source, usize)>)> = vec![(
353                "the default IO",
354                Route::from("/root/f1"),
355                Some((Source::default(), 0)),
356            )];
357            test_cases.push((
358                "array element selected from the default output",
359                Route::from("/root/f1/1"),
360                Some((Output("/1".into()), 0)),
361            ));
362            test_cases.push((
363                "correctly named IO",
364                Route::from("/root/f2/output_value"),
365                Some((Output("/output_value".into()), 1)),
366            ));
367            test_cases.push((
368                "incorrectly named function",
369                Route::from("/root/f2b"),
370                None,
371            ));
372            test_cases.push((
373                "incorrectly named IO",
374                Route::from("/root/f2/output_fake"),
375                None,
376            ));
377            test_cases.push((
378                "the default IO of a function (which does not exist)",
379                Route::from("/root/f2"),
380                None,
381            ));
382            test_cases.push((
383                "subroute to part of non-existent function",
384                Route::from("/root/f0/sub_struct"),
385                None,
386            ));
387            test_cases.push((
388                "subroute to part of a function's default output's structure",
389                Route::from("/root/f1/sub_struct"),
390                Some((Output("/sub_struct".into()), 0)),
391            ));
392            test_cases.push((
393                "subroute to an array element from part of output's structure",
394                Route::from("/root/f1/sub_array/1"),
395                Some((Output("/sub_array/1".into()), 0)),
396            ));
397
398            (test_sources, test_cases)
399        }
400
401        #[test]
402        fn test_get_source() {
403            let (test_sources, test_cases) = test_source_routes();
404
405            for test_case in test_cases {
406                let found = get_source(&test_sources, &test_case.1);
407                assert_eq!(found, test_case.2);
408            }
409        }
410    }
411
412    /*
413        Test an error is thrown if a flow has no side effects, and that unconnected functions
414       are removed by the optimizer
415    */
416    #[test]
417    fn no_side_effects() {
418        let function = FunctionDefinition::new(
419            Name::from("Stdout"),
420            false,
421            "context://stdio/stdout.toml".to_owned(),
422            Name::from("test-function"),
423            vec![IO::new(vec!(STRING_TYPE.into()), "/print")],
424            vec![],
425            Url::parse("context://stdio/stdout.toml").expect("Could not parse Url"),
426            Route::from("/print"),
427            None,
428            Some(Url::parse("context://stdio/stdout.toml")
429                     .expect("Could not parse Url")),
430            vec![],
431            0,
432            0,
433        );
434
435        let function_ref = ProcessReference {
436            alias: function.alias().to_owned(),
437            source: function.get_source_url().to_string(),
438            initializations: BTreeMap::new(),
439        };
440
441        let _test_flow = FlowDefinition::default();
442
443        let flow = FlowDefinition {
444            alias: Name::from("root"),
445            name: Name::from("test-flow"),
446            process_refs: vec![function_ref],
447            source_url: FlowDefinition::default_url(),
448            ..Default::default()
449        };
450
451        let output_dir = TempDir::new("flow-test").expect("A temp dir").into_path();
452        let mut source_urls = BTreeMap::<String, Url>::new();
453        // Optimizer should remove unconnected function leaving no side-effects
454        match compile(&flow,
455                      &output_dir,
456                      true,
457                      false,
458                      #[cfg(feature = "debugger")]
459                        &mut source_urls
460                        ) {
461            Ok(_tables) => panic!("Flow should not compile when it has no side-effects"),
462            Err(e) => assert_eq!("Flow has no side-effects", e.description()),
463        }
464    }
465
466    fn test_function() -> FunctionDefinition {
467        FunctionDefinition::new(
468            "Stdout".into(),
469            false,
470            "test.rs".to_string(),
471            "print".into(),
472            vec![IO::new(vec!(STRING_TYPE.into()), Route::default())],
473            vec![IO::new(vec!(STRING_TYPE.into()), Route::default())],
474            Url::parse(&format!(
475                "file://{}/{}",
476                env!("CARGO_MANIFEST_DIR"),
477                "tests/test-functions/test/test"
478            ))
479                .expect("Could not create source Url"),
480            Route::from("/flow0/stdout"),
481            Some(Url::parse("lib::/tests/test-functions/test/test")
482                .expect("Could not parse Url")),
483            None,
484            vec![OutputConnection::new(
485                Source::default(),
486                1,
487                0,
488                0,
489                String::default(),
490                #[cfg(feature = "debugger")]
491                    String::default(),
492            )],
493            0,
494            0,
495        )
496    }
497
498    #[test]
499    fn paths_test() {
500        let function = test_function();
501
502        let target_dir = TempDir::new("flow")
503            .expect("Could not create TempDir during testing")
504            .into_path();
505        let expected_output_wasm = target_dir.join("test.wasm");
506
507        let (impl_source_path, impl_wasm_path) =
508            get_paths(&target_dir, &function).expect("Error in 'get_paths'");
509
510        assert_eq!(
511            format!(
512                "{}/{}",
513                Path::new(env!("CARGO_MANIFEST_DIR"))
514                    .parent()
515                    .expect("Error getting Manifest Dir")
516                    .display(),
517                "flowc/tests/test-functions/test/test.rs"
518            ),
519            impl_source_path
520                .to_str()
521                .expect("Error converting path to str")
522        );
523        assert_eq!(expected_output_wasm, impl_wasm_path);
524    }
525}