flowr 1.0.0

Runners for compiled 'flow' programs
Documentation
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::path::Path;
use std::sync::{Arc, Mutex};

use image::{ImageBuffer, ImageFormat, Rgb, RgbImage};
use log::debug;
#[cfg(feature = "debugger")]
use log::error;
#[cfg(feature = "debugger")]
use log::info;

use flowcore::errors::Result;

use crate::cli::connections::ClientConnection;
use crate::cli::coordinator_message::{ClientMessage, CoordinatorMessage};

const DEFAULT_NAME: &str = "unknown";

#[derive(Debug, Clone)]
pub struct CliRuntimeClient {
    args: Vec<String>,
    override_args: Arc<Mutex<Vec<String>>>,
    image_buffers: HashMap<String, ImageBuffer<Rgb<u8>, Vec<u8>>>,
    #[cfg(feature = "metrics")]
    display_metrics: bool,
}

impl CliRuntimeClient {
    /// Create a new runtime client
    pub fn new(
        args: Vec<String>,
        override_args: Arc<Mutex<Vec<String>>>,
        #[cfg(feature = "metrics")] display_metrics: bool,
    ) -> Self {
        CliRuntimeClient {
            args,
            override_args,
            image_buffers: HashMap::<String, ImageBuffer<Rgb<u8>, Vec<u8>>>::new(),
            #[cfg(feature = "metrics")]
            display_metrics,
        }
    }

    /// Enter a loop where we receive events as a client and respond to them
    pub fn event_loop(mut self, connection: &ClientConnection) -> Result<()> {
        loop {
            let event = connection.receive()?;
            let response = self.process_coordinator_message(event);
            if let ClientMessage::ClientExiting(coordinator_result) = response {
                debug!("Client is exiting the event loop.");
                return coordinator_result;
            }
            let _ = connection.send(response);
        }
    }

    fn flush_image_buffers(&mut self) {
        for (filename, image_buffer) in self.image_buffers.drain() {
            info!("Flushing ImageBuffer to file: {filename}");
            if let Err(e) = image_buffer.save_with_format(Path::new(&filename), ImageFormat::Png) {
                error!("Error saving ImageBuffer '{filename}': '{e}'");
            }
        }
    }

    #[allow(clippy::too_many_lines)]
    #[allow(clippy::many_single_char_names)]
    fn process_coordinator_message(&mut self, message: CoordinatorMessage) -> ClientMessage {
        match message {
            #[cfg(feature = "metrics")]
            CoordinatorMessage::FlowEnd(metrics) => {
                debug!("=========================== Flow execution ended ======================================");
                if self.display_metrics {
                    println!("\nMetrics: \n{metrics}");
                    let _ = io::stdout().flush();
                }

                self.flush_image_buffers();
                ClientMessage::ClientExiting(Ok(()))
            }

            #[cfg(not(feature = "metrics"))]
            CoordinatorMessage::FlowEnd => {
                debug!("=========================== Flow execution ended ======================================");
                self.flush_image_buffers();
                ClientMessage::ClientExiting(Ok(()))
            }
            CoordinatorMessage::FlowStart => {
                debug!("===========================    Starting flow execution =============================");
                ClientMessage::Ack
            }
            CoordinatorMessage::CoordinatorExiting(result) => {
                debug!("Coordinator is exiting");
                ClientMessage::ClientExiting(result)
            }
            CoordinatorMessage::StdoutEof => ClientMessage::Ack,
            CoordinatorMessage::Stdout(contents) => {
                let stdout = io::stdout();
                let mut handle = stdout.lock();
                let _ = handle.write_all(format!("{contents}\n").as_bytes());
                let _ = io::stdout().flush();
                ClientMessage::Ack
            }
            CoordinatorMessage::StderrEof => ClientMessage::Ack,
            CoordinatorMessage::Stderr(contents) => {
                let stderr = io::stderr();
                let mut handle = stderr.lock();
                let _ = handle.write_all(format!("{contents}\n").as_bytes());
                let _ = io::stdout().flush();
                ClientMessage::Ack
            }
            CoordinatorMessage::GetStdin => {
                let mut buffer = String::new();
                if let Ok(size) = io::stdin().read_to_string(&mut buffer) {
                    return if size > 0 {
                        ClientMessage::Stdin(buffer.trim().to_string())
                    } else {
                        ClientMessage::GetStdinEof
                    };
                }
                ClientMessage::Error("Could not read Stdin".into())
            }
            CoordinatorMessage::GetLine(_prompt) => {
                let mut input = String::new();
                let line = io::stdin().lock().read_line(&mut input);
                match line {
                    Ok(n) if n > 0 => ClientMessage::Line(input.trim().to_string()),
                    Ok(0) => ClientMessage::GetLineEof,
                    _ => ClientMessage::Error("Could not read Readline".into()),
                }
            }
            CoordinatorMessage::Read(file_path) => match File::open(&file_path) {
                Ok(mut f) => {
                    let mut buffer = Vec::new();
                    match f.read_to_end(&mut buffer) {
                        Ok(_) => ClientMessage::FileContents(file_path, buffer),
                        Err(_) => ClientMessage::Error(format!(
                            "Could not read content from '{file_path:?}'"
                        )),
                    }
                }
                Err(_) => ClientMessage::Error(format!("Could not open file '{file_path:?}'")),
            },
            CoordinatorMessage::Write(filename, bytes) => match File::create(&filename) {
                Ok(mut file) => match file.write_all(bytes.as_slice()) {
                    Ok(()) => ClientMessage::Ack,
                    Err(e) => {
                        let msg = format!("Error writing to file: '{filename}': '{e}'");
                        error!("{msg}");
                        ClientMessage::Error(msg)
                    }
                },
                Err(e) => {
                    let msg = format!("Error creating file: '{filename}': '{e}'");
                    error!("{msg}");
                    ClientMessage::Error(msg)
                }
            },
            #[allow(clippy::many_single_char_names)]
            CoordinatorMessage::PixelWrite((x, y), (r, g, b), (width, height), name) => {
                let image = self
                    .image_buffers
                    .entry(name)
                    .or_insert_with(|| RgbImage::new(width, height));
                image.put_pixel(x, y, Rgb([r, g, b]));
                ClientMessage::Ack
            }
            CoordinatorMessage::ImageWrite(grid, name) => {
                let height = u32::try_from(grid.len()).unwrap_or(0);
                let width = grid
                    .first()
                    .map_or(0, |row| u32::try_from(row.len()).unwrap_or(0));
                let image = self
                    .image_buffers
                    .entry(name)
                    .or_insert_with(|| RgbImage::new(width, height));
                for (y, row) in grid.iter().enumerate() {
                    for (x, &val) in row.iter().enumerate() {
                        let gray = val;
                        image.put_pixel(
                            u32::try_from(x).unwrap_or(0),
                            u32::try_from(y).unwrap_or(0),
                            Rgb([gray, gray, gray]),
                        );
                    }
                }
                ClientMessage::Ack
            }
            CoordinatorMessage::GetArgs => {
                if let Ok(override_args) = self.override_args.lock() {
                    if override_args.is_empty() {
                        ClientMessage::Args(self.args.clone())
                    } else {
                        // we want to retain arg[0] which is the flow name and replace all others
                        // with the override args supplied
                        let arg_zero = self
                            .args
                            .first()
                            .unwrap_or(&DEFAULT_NAME.to_string())
                            .to_owned();
                        let mut one_time_args = vec![arg_zero];
                        one_time_args.append(&mut override_args.to_vec());
                        ClientMessage::Args(one_time_args)
                    }
                } else {
                    ClientMessage::Args(self.args.clone())
                }
            }
            CoordinatorMessage::Invalid => ClientMessage::Ack,
        }
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod test {
    use std::fs;
    use std::fs::File;
    use std::io::prelude::*;
    use std::sync::{Arc, Mutex};

    use tempfile::tempdir;

    #[cfg(feature = "metrics")]
    use flowcore::model::metrics::Metrics;

    use crate::cli::coordinator_message::{ClientMessage, CoordinatorMessage};

    use super::CliRuntimeClient;

    #[test]
    fn test_arg_passing() {
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string(), "1".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );

        match client.process_coordinator_message(CoordinatorMessage::GetArgs) {
            ClientMessage::Args(args) => assert_eq!(
                vec!("file:///test_flow.toml".to_string(), "1".to_string()),
                args
            ),
            _ => panic!("Didn't get Args response as expected"),
        }
    }

    #[test]
    fn test_arg_overriding() {
        let override_args = Arc::new(Mutex::new(vec![]));
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string(), "1".to_string()],
            override_args.clone(),
            #[cfg(feature = "metrics")]
            false,
        );

        {
            let mut overrides = override_args.lock().expect("Could not lock override args");
            overrides.push("override".into());
        }

        match client.process_coordinator_message(CoordinatorMessage::GetArgs) {
            ClientMessage::Args(args) => assert_eq!(
                vec!("file:///test_flow.toml".to_string(), "override".to_string()),
                args
            ),
            _ => panic!("Args override response was not as expected"),
        }
    }

    #[test]
    fn test_file_reading() {
        let test_contents = b"The quick brown fox jumped over the lazy dog";

        let temp = tempdir().expect("Couldn't get temporary directory").keep();
        let file_path = temp.join("test_read").to_string_lossy().to_string();
        {
            let mut file = File::create(&file_path).expect("Could not create test file");
            file.write_all(test_contents)
                .expect("Could not write to test file");
        }
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );

        match client.process_coordinator_message(CoordinatorMessage::Read(file_path.clone())) {
            ClientMessage::FileContents(path_read, contents) => {
                assert_eq!(path_read, file_path);
                assert_eq!(contents, test_contents);
            }
            _ => panic!("Didn't get Read response as expected"),
        }
    }

    #[test]
    fn test_file_writing() {
        let temp = tempdir().expect("Couldn't get temporary directory").keep();
        let file = temp.join("test");

        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );

        match client.process_coordinator_message(CoordinatorMessage::Write(
            file.to_str().expect("Couldn't get filename").to_string(),
            b"Hello".to_vec(),
        )) {
            ClientMessage::Ack => {}
            _ => panic!("Didn't get Write response as expected"),
        }
    }

    #[test]
    fn test_stdout() {
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );
        match client.process_coordinator_message(CoordinatorMessage::Stdout("Hello".into())) {
            ClientMessage::Ack => {}
            _ => panic!("Didn't get Stdout response as expected"),
        }
    }

    #[test]
    fn test_stderr() {
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );
        match client.process_coordinator_message(CoordinatorMessage::Stderr("Hello".into())) {
            ClientMessage::Ack => {}
            _ => panic!("Didn't get Stderr response as expected"),
        }
    }

    #[test]
    fn test_image_writing() {
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );

        let temp_dir = tempdir().expect("Couldn't get temporary directory").keep();
        let path = temp_dir.join("flow.png");

        let _ = fs::remove_file(&path);
        assert!(!path.exists());

        client.process_coordinator_message(CoordinatorMessage::FlowStart);
        let pixel = CoordinatorMessage::PixelWrite(
            (0, 0),
            (255, 200, 20),
            (10, 10),
            path.display().to_string(),
        );
        match client.process_coordinator_message(pixel) {
            ClientMessage::Ack => {}
            _ => panic!("Didn't get pixel write response as expected"),
        }

        #[cfg(not(feature = "metrics"))]
        client.process_coordinator_message(CoordinatorMessage::FlowEnd);
        #[cfg(feature = "metrics")]
        client.process_coordinator_message(CoordinatorMessage::FlowEnd(Metrics::new(1)));

        assert!(path.exists(), "Image file was not created");
    }

    #[test]
    fn coordinator_exiting() {
        let mut client = CliRuntimeClient::new(
            vec!["file:///test_flow.toml".to_string()],
            Arc::new(Mutex::new(vec![])),
            #[cfg(feature = "metrics")]
            false,
        );

        match client.process_coordinator_message(CoordinatorMessage::CoordinatorExiting(Ok(()))) {
            ClientMessage::ClientExiting(_) => {}
            _ => panic!("Didn't get ClientExiting response as expected"),
        }
    }
}