#![deny(clippy::unwrap_used, clippy::expect_used)]
use core::str::FromStr;
use std::fs::File;
use std::io::prelude::*;
use std::path::PathBuf;
use std::{env, process, thread};
use clap::Command as ClapCommand;
use clap::{Arg, ArgMatches};
use env_logger::Builder;
use iced::widget::operation::{self, RelativeOffset};
use iced::widget::{center, mouse_area, opaque, stack, text_input, Button, Column, Id, Row, Text};
use iced::{Center, Element, Fill, Subscription, Task};
use iced_aw::Card;
use image::{ImageBuffer, Rgba, RgbaImage};
use log::{debug, info, trace, LevelFilter};
use simpath::Simpath;
use url::Url;
use flowcore::meta_provider::MetaProvider;
use flowcore::model::flow_manifest::FlowManifest;
use flowcore::model::submission::Submission;
use flowcore::provider::Provider;
use flowcore::url_helper::url_from_string;
use flowrlib::info as flowrlib_info;
use gui::coordinator_connection::CoordinatorConnection;
use gui::debug_message::DebugServerMessage;
use gui::debug_message::DebugServerMessage::{
BlockBreakpoint, DataBreakpoint, ExecutionEnded, ExecutionStarted, ExitingDebugger,
JobCompleted, JobError, Panic, PriorToSendingJob, Resetting, WaitingForCommand,
};
use crate::gui::client_message::ClientMessage;
use crate::gui::coordinator_message::CoordinatorMessage;
use crate::tabs::TabSet;
mod context;
mod gui;
mod connection_manager;
mod tabs;
mod errors;
mod theme;
#[derive(Debug, Clone)]
pub enum Message {
CoordinatorDisconnected(String),
CoordinatorSent(CoordinatorMessage),
SubmitFlow, Submitted,
SubmitError(String),
UrlChanged(String),
FlowArgsChanged(String),
MaxJobsChanged(String),
DebugSubmitFlow,
TabSelected(usize),
NewStdin(String),
LineOfStdin(String),
SendEof,
StdioAutoScrollTogglerChanged(Id, bool),
StopFlow,
ClearTab(String),
SaveTabContent(String),
SaveImage(String),
SaveError(String),
CloseModal,
}
#[allow(clippy::ignored_unit_patterns)]
enum CoordinatorState {
Disconnected(String),
Connected(tokio::sync::mpsc::Sender<ClientMessage>),
}
fn dark_mode_enabled() -> bool {
#[cfg(target_os = "macos")]
{
std::process::Command::new("defaults")
.args(["read", "-g", "AppleInterfaceStyle"])
.output()
.map_or(true, |o| {
String::from_utf8_lossy(&o.stdout).contains("Dark")
})
}
#[cfg(not(target_os = "macos"))]
{
true
}
}
fn main() -> iced::Result {
iced::application(FlowrGui::new, FlowrGui::update, FlowrGui::view)
.subscription(FlowrGui::subscription)
.title(FlowrGui::title)
.theme(FlowrGui::theme)
.antialiasing(true)
.run()
}
#[derive(Clone)]
struct SubmissionSettings {
flow_manifest_url: String,
flow_args: String,
max_jobs_text: String,
debug_this_flow: bool,
display_metrics: bool,
parallel_jobs_limit: Option<usize>,
}
#[derive(Clone)]
pub struct ServerSettings {
native_flowstdlib: bool,
num_threads: usize,
lib_search_path: Simpath,
}
#[derive(Clone)]
pub enum CoordinatorSettings {
Server(ServerSettings),
ClientOnly(u16),
}
struct UiSettings {
auto_start: bool,
auto_exit: bool,
}
struct ImageReference {
pub width: u32,
pub height: u32,
pub data: ImageBuffer<Rgba<u8>, Vec<u8>>,
}
#[allow(clippy::struct_excessive_bools)]
struct FlowrGui {
submission_settings: SubmissionSettings,
coordinator_settings: CoordinatorSettings,
ui_settings: UiSettings,
coordinator_state: CoordinatorState,
tab_set: TabSet,
running: bool,
submitted: bool,
show_modal: bool,
modal_content: (String, String),
pending_getline: bool,
}
impl FlowrGui {
fn new() -> (Self, Task<Message>) {
let settings = FlowrGui::initial_settings();
let tab_set = TabSet::new();
let flowrgui = FlowrGui {
submission_settings: settings.0,
coordinator_settings: settings.1,
ui_settings: settings.2,
coordinator_state: CoordinatorState::Disconnected("Starting".into()),
tab_set,
submitted: false,
running: false,
show_modal: false,
modal_content: (String::new(), String::new()),
pending_getline: false,
};
(flowrgui, Task::none())
}
#[allow(clippy::unused_self)]
fn title(&self) -> String {
String::from("flowrgui")
}
#[allow(clippy::unused_self)]
fn theme(&self) -> iced::Theme {
if dark_mode_enabled() {
iced::Theme::CatppuccinMocha
} else {
iced::Theme::CatppuccinLatte
}
}
fn update(&mut self, message: Message) -> Task<Message> {
match message {
Message::CoordinatorSent(CoordinatorMessage::Connected(sender)) => {
self.coordinator_state = CoordinatorState::Connected(sender);
if self.ui_settings.auto_start {
return Task::perform(Self::auto_submit(), |()| Message::SubmitFlow);
}
}
Message::SubmitFlow => {
if let CoordinatorState::Connected(sender) = &self.coordinator_state {
return Task::perform(
Self::submit(sender.clone(), self.submission_settings.clone()),
|result| match result {
Ok(()) => Message::Submitted,
Err(msg) => Message::SubmitError(msg),
},
);
}
}
Message::Submitted => {
self.tab_set.clear();
self.tab_set.flow_name =
std::path::Path::new(&self.submission_settings.flow_manifest_url)
.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.unwrap_or("")
.to_string();
self.submitted = true;
}
Message::SubmitError(msg) | Message::SaveError(msg) => {
self.show_modal = true;
self.modal_content = ("Error".into(), msg);
}
Message::StopFlow => {
connection_manager::request_stop();
}
Message::FlowArgsChanged(value) => self.submission_settings.flow_args = value,
Message::MaxJobsChanged(value) => {
self.submission_settings.parallel_jobs_limit = value.trim().parse::<usize>().ok();
self.submission_settings.max_jobs_text = value;
}
Message::DebugSubmitFlow => {
if let CoordinatorState::Connected(sender) = &self.coordinator_state {
let mut settings = self.submission_settings.clone();
settings.debug_this_flow = true;
return Task::perform(Self::submit(sender.clone(), settings), |result| {
match result {
Ok(()) => Message::Submitted,
Err(msg) => Message::SubmitError(msg),
}
});
}
}
Message::UrlChanged(value) => self.submission_settings.flow_manifest_url = value,
Message::TabSelected(_)
| Message::StdioAutoScrollTogglerChanged(_, _)
| Message::ClearTab(_)
| Message::SaveTabContent(_)
| Message::SaveImage(_) => {
return self.tab_set.update(message);
}
Message::CoordinatorSent(coord_msg) => {
return self.process_coordinator_message(coord_msg);
}
Message::CloseModal => self.show_modal = false,
Message::CoordinatorDisconnected(reason) => {
self.coordinator_state = CoordinatorState::Disconnected(reason);
}
Message::NewStdin(text) => self.tab_set.stdin_tab.text_entered(text),
Message::LineOfStdin(line) => {
debug!("LineOfStdin: user entered line ({} chars)", line.len());
self.tab_set.stdin_tab.new_line(line);
if self.pending_getline {
if let Some(line) = self.tab_set.stdin_tab.get_line() {
debug!(
"LineOfStdin: responding to pending GetLine ({} chars)",
line.len()
);
self.send(ClientMessage::Line(line));
}
self.pending_getline = false;
self.tab_set.stdin_tab.waiting_for_input = false;
}
}
Message::SendEof => {
debug!("SendEof: user clicked EOF button");
if self.pending_getline {
debug!("SendEof: responding to pending GetLine with EOF");
self.send(ClientMessage::GetLineEof);
self.pending_getline = false;
self.tab_set.stdin_tab.waiting_for_input = false;
} else {
self.tab_set.stdin_tab.eof_signaled = true;
}
}
}
Task::none()
}
fn view(&self) -> Element<'_, Message> {
let main_content = Column::new()
.spacing(12)
.push(self.command_row())
.push(self.tab_set.view())
.push(self.status_row())
.padding(16);
if self.show_modal {
let modal_card = Card::new(
Text::new(self.modal_content.clone().0),
Text::new(self.modal_content.clone().1),
)
.foot(
Row::new().spacing(10).padding(5).width(Fill).push(
Button::new(Text::new("OK").align_x(Center))
.width(Fill)
.on_press(Message::CloseModal),
),
)
.max_width(300.0);
stack![
main_content,
opaque(mouse_area(center(opaque(modal_card))).on_press(Message::CloseModal))
]
.into()
} else {
main_content.into()
}
}
fn subscription(&self) -> Subscription<Message> {
connection_manager::subscribe(self.coordinator_settings.clone())
.map(Message::CoordinatorSent)
}
}
impl FlowrGui {
#[allow(clippy::unused_async)]
async fn auto_submit() {
info!("Auto submitting flow");
}
async fn submit(
sender: tokio::sync::mpsc::Sender<ClientMessage>,
settings: SubmissionSettings,
) -> Result<(), String> {
let url = Self::flow_url(&settings.flow_manifest_url)
.map_err(|e| format!("Invalid flow URL '{}': {e}", settings.flow_manifest_url))?;
let provider = &MetaProvider::new(Simpath::new(""), PathBuf::default()) as &dyn Provider;
let (flow_manifest, _) = FlowManifest::load(provider, &url)
.map_err(|e| format!("Could not load flow manifest: {e}"))?;
let submission = Submission::new(
flow_manifest,
settings.parallel_jobs_limit,
None,
settings.debug_this_flow,
);
info!("Sending submission to Coordinator");
sender
.send(ClientMessage::ClientSubmission(Box::new(submission)))
.await
.map_err(|e| format!("Could not submit flow to coordinator: {e}"))
}
fn error(&mut self, msg: &str) {
self.show_modal = true;
self.modal_content = ("Error".into(), msg.to_string());
}
fn info(&mut self, msg: &str) {
self.show_modal = true;
self.modal_content = ("Info".into(), msg.to_string());
}
fn command_row(&self) -> Row<'_, Message> {
let url = text_input(
"Flow location (relative, or absolute)",
&self.submission_settings.flow_manifest_url,
)
.on_input(Message::UrlChanged)
.on_submit(Message::SubmitFlow);
let args = text_input(
"Space separated flow arguments",
&self.submission_settings.flow_args,
)
.on_submit(Message::SubmitFlow)
.on_input(Message::FlowArgsChanged)
.on_paste(Message::FlowArgsChanged);
let max_jobs = text_input("Max jobs", &self.submission_settings.max_jobs_text)
.on_input(Message::MaxJobsChanged)
.width(80);
let can_run = matches!(self.coordinator_state, CoordinatorState::Connected(_))
&& !self.running
&& !self.submitted;
let play = if self.running {
Button::new(Text::new("\u{23F9} Stop"))
.on_press(Message::StopFlow)
.style(theme::styled_button)
.padding([6, 16])
} else {
let mut btn = Button::new(Text::new("\u{25B6} Play"))
.style(theme::styled_button)
.padding([6, 16]);
if can_run {
btn = btn.on_press(Message::SubmitFlow);
}
btn
};
let mut debug_play = Button::new(Text::new("\u{1F41B} Debug"))
.style(theme::styled_button)
.padding([6, 16]);
if can_run {
debug_play = debug_play.on_press(Message::DebugSubmitFlow);
}
Row::new()
.spacing(10)
.padding(5)
.align_y(iced::alignment::Vertical::Center)
.push(url)
.push(args)
.push(max_jobs)
.push(play)
.push(debug_play)
}
fn status_row(&self) -> Row<'_, Message> {
let (indicator, status) = match &self.coordinator_state {
CoordinatorState::Disconnected(reason) => {
("\u{1F534}", format!("Disconnected({reason})"))
}
CoordinatorState::Connected(_) => match (self.submitted, self.running) {
(false, false) => ("\u{1F7E2}", "Ready".to_string()),
(_, true) => ("\u{1F535}", "Running".to_string()),
(true, false) => ("\u{1F7E1}", "Submitted".to_string()),
},
};
let mut row = Row::new()
.spacing(8)
.align_y(iced::alignment::Vertical::Center)
.push(Text::new(indicator))
.push(Text::new(status).size(13));
if self.running {
row = row
.push(Text::new(format!("Jobs: {}", connection_manager::get_job_count())).size(13));
}
row
}
fn initial_settings() -> (SubmissionSettings, CoordinatorSettings, UiSettings) {
let matches = Self::parse_cli_args();
let default = String::from("error");
let verbosity = matches.get_one::<String>("verbosity").unwrap_or(&default);
let level = LevelFilter::from_str(verbosity).unwrap_or(LevelFilter::Error);
let mut builder = Builder::from_default_env();
builder.filter_level(level).init();
info!(
"'{}' version {}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
);
info!("'flowrlib' version {}", flowrlib_info::version());
let flow_manifest_url = matches
.get_one::<String>("flow-manifest")
.unwrap_or(&String::new())
.clone();
let flow_args = match matches.get_many::<String>("flow-args") {
Some(values) => {
println!("values {values:?}");
values
.map(std::string::ToString::to_string)
.collect::<Vec<String>>()
.join(" ")
}
None => String::new(),
};
let parallel_jobs_limit = matches
.get_one::<usize>("jobs")
.map(std::borrow::ToOwned::to_owned);
let debug_this_flow = matches.get_flag("debugger");
let coordinator_settings = if let Some(port) = matches.get_one::<u16>("client") {
CoordinatorSettings::ClientOnly(*port)
} else {
let lib_dirs = if matches.contains_id("lib_dir") {
if let Some(dirs) = matches.get_many::<String>("lib_dir") {
dirs.map(std::string::ToString::to_string).collect()
} else {
vec![]
}
} else {
vec![]
};
let lib_search_path = FlowrGui::lib_search_path(&lib_dirs);
let native_flowstdlib = matches.get_flag("native");
let num_threads = FlowrGui::num_threads(&matches);
let server_settings = ServerSettings {
native_flowstdlib,
num_threads,
lib_search_path,
};
CoordinatorSettings::Server(server_settings)
};
let auto = matches.get_flag("auto");
let auto_start = auto || matches.get_flag("auto-start");
(
SubmissionSettings {
flow_manifest_url,
flow_args,
max_jobs_text: parallel_jobs_limit.map_or(String::new(), |n| n.to_string()),
debug_this_flow,
display_metrics: matches.get_flag("metrics"),
parallel_jobs_limit,
},
coordinator_settings,
UiSettings {
auto_start,
auto_exit: auto,
},
)
}
fn parse_cli_args() -> ArgMatches {
let app = ClapCommand::new(env!("CARGO_PKG_NAME")).version(env!("CARGO_PKG_VERSION"));
let app = app.arg(
Arg::new("debugger")
.short('d')
.long("debugger")
.action(clap::ArgAction::SetTrue)
.help("Enable the debugger when running a flow"),
);
#[cfg(feature = "flowstdlib")]
let app = app.arg(
Arg::new("native")
.short('n')
.long("native")
.action(clap::ArgAction::SetTrue)
.help("Link with native (not WASM) version of flowstdlib"),
);
let app = app.arg(
Arg::new("client")
.short('c')
.long("client")
.number_of_values(1)
.value_parser(clap::value_parser!(u16))
.help("Launch only a client (no coordinator) to connect to a remote coordinator"),
);
let app = app.arg(
Arg::new("metrics")
.short('m')
.long("metrics")
.action(clap::ArgAction::SetTrue)
.help("Calculate metrics during flow execution and print them out when done"),
);
let app = app.arg(
Arg::new("auto")
.short('a')
.long("auto")
.action(clap::ArgAction::SetTrue)
.help("Run any flow specified automatically on start-up. Exit automatically."),
);
let app = app.arg(
Arg::new("auto-start")
.long("auto-start")
.action(clap::ArgAction::SetTrue)
.help("Run the flow automatically on start-up, but stay open for interaction."),
);
let app = app
.arg(Arg::new("jobs")
.short('j')
.long("jobs")
.number_of_values(1)
.value_parser(clap::value_parser!(usize))
.value_name("MAX_JOBS")
.help("Set maximum number of jobs that can be running in parallel)"))
.arg(Arg::new("lib_dir")
.short('L')
.long("libdir")
.num_args(0..)
.number_of_values(1)
.value_name("LIB_DIR|BASE_URL")
.help("Add a directory or base Url to the Library Search path"))
.arg(Arg::new("threads")
.short('t')
.long("threads")
.number_of_values(1)
.value_parser(clap::value_parser!(usize))
.value_name("THREADS")
.help("Set number of threads to use to execute jobs (min: 1, default: cores available)"))
.arg(Arg::new("verbosity")
.short('v')
.long("verbosity")
.number_of_values(1)
.value_name("VERBOSITY_LEVEL")
.help("Set verbosity level for output (trace, debug, info, warn, default: error)"))
.arg(Arg::new("flow-manifest")
.num_args(1)
.help("the file path of the 'flow' manifest file"))
.arg(Arg::new("flow-args")
.num_args(0..)
.trailing_var_arg(true)
.help("A list of arguments to pass to the flow."));
app.get_matches()
}
fn flow_url(flow_url_string: &str) -> flowcore::errors::Result<Url> {
let cwd_url = Url::from_directory_path(env::current_dir()?)
.map_err(|()| "Could not form a Url for the current working directory")?;
url_from_string(&cwd_url, Some(flow_url_string))
}
fn flow_arg_vec(&self) -> Vec<String> {
let mut flow_args: Vec<String> = vec![self.submission_settings.flow_manifest_url.clone()];
let additional_args: Vec<String> = self
.submission_settings
.flow_args
.split(' ')
.map(std::string::ToString::to_string)
.collect();
flow_args.extend(additional_args);
flow_args
}
fn lib_search_path(search_path_additions: &[String]) -> Simpath {
let mut lib_search_path = Simpath::new_with_separator("FLOW_LIB_PATH", ',');
for additions in search_path_additions {
lib_search_path.add(additions);
info!("'{additions}' added to the Library Search Path");
}
if lib_search_path.is_empty() {
let home_dir = env::var("HOME").unwrap_or_else(|_| "Could not get $HOME".to_string());
lib_search_path.add(&format!("{home_dir}/.flow/lib"));
}
lib_search_path
}
#[allow(clippy::redundant_closure_for_method_calls)]
fn num_threads(matches: &ArgMatches) -> usize {
match matches.get_one::<usize>("threads") {
Some(num_threads) => *num_threads,
None => thread::available_parallelism().map_or(1, |n| n.get()),
}
}
fn send(&mut self, msg: ClientMessage) {
if let CoordinatorState::Connected(ref sender) = self.coordinator_state {
let _ = sender.try_send(msg);
}
}
#[allow(clippy::too_many_lines)]
fn process_coordinator_message(&mut self, message: CoordinatorMessage) -> Task<Message> {
match message {
CoordinatorMessage::Connected(_) => {
self.error("Coordinator is already connected");
}
CoordinatorMessage::FlowStart => {
self.running = true;
self.submitted = false;
connection_manager::set_job_count(0);
self.send(ClientMessage::Ack);
}
CoordinatorMessage::FlowEnd(metrics) => {
self.running = false;
self.submitted = false;
self.pending_getline = false;
connection_manager::set_job_count(0);
self.tab_set.stdin_tab.waiting_for_input = false;
if self.submission_settings.display_metrics {
self.show_modal = true;
self.modal_content = ("Flow Ended - Metrics".into(), format!("{metrics}"));
}
if self.ui_settings.auto_exit {
self.info("Auto exiting on flow completion");
let _ = std::io::stdout().flush();
process::exit(0);
}
}
CoordinatorMessage::CoordinatorExiting(_) => {
self.coordinator_state = CoordinatorState::Disconnected("Exited".into());
self.send(ClientMessage::Ack);
}
CoordinatorMessage::Stdout(string) => {
if self.ui_settings.auto_exit {
println!("{string}");
}
self.tab_set.stdout_tab.content.push(string);
if self.tab_set.active_tab != 0 {
self.tab_set.stdout_tab.unread_count += 1;
}
self.send(ClientMessage::Ack);
if self.tab_set.stdout_tab.auto_scroll {
return operation::snap_to(
self.tab_set.stdout_tab.id.clone(),
RelativeOffset::END,
);
}
}
CoordinatorMessage::Stderr(string) => {
if self.ui_settings.auto_exit {
eprintln!("{string}");
}
self.tab_set.stderr_tab.content.push(string);
if self.tab_set.active_tab != 1 {
self.tab_set.stderr_tab.unread_count += 1;
}
self.send(ClientMessage::Ack);
if self.tab_set.stderr_tab.auto_scroll {
return operation::snap_to(
self.tab_set.stderr_tab.id.clone(),
RelativeOffset::END,
);
}
}
CoordinatorMessage::GetStdin => {
debug!(
"GetStdin received, buffer has {} lines, cursor at {}",
self.tab_set.stdin_tab.content.len(),
self.tab_set.stdin_tab.cursor
);
if self.ui_settings.auto_exit
&& self.tab_set.stdin_tab.cursor >= self.tab_set.stdin_tab.content.len()
{
let stdin = std::io::stdin();
for line in stdin.lock().lines().map_while(Result::ok) {
self.tab_set.stdin_tab.new_line(line);
}
}
let msg = if let Some(buf) = self.tab_set.stdin_tab.get_all() {
debug!("GetStdin: returning buffered content ({} bytes)", buf.len());
ClientMessage::Stdin(buf)
} else {
debug!("GetStdin: buffer empty, sending GetStdinEof");
ClientMessage::GetStdinEof
};
self.send(msg);
}
CoordinatorMessage::GetLine(_prompt) => {
debug!(
"GetLine received, buffer has {} lines, cursor at {}",
self.tab_set.stdin_tab.content.len(),
self.tab_set.stdin_tab.cursor
);
if self.ui_settings.auto_exit
&& self.tab_set.stdin_tab.cursor >= self.tab_set.stdin_tab.content.len()
{
let mut input = String::new();
match std::io::stdin().lock().read_line(&mut input) {
Ok(n) if n > 0 => {
self.tab_set.stdin_tab.new_line(input.trim().to_string());
}
_ => {} }
}
if let Some(line) = self.tab_set.stdin_tab.get_line() {
trace!("GetLine: returning buffered line: '{line}'");
debug!("GetLine: returning buffered line ({} chars)", line.len());
self.send(ClientMessage::Line(line));
} else if self.ui_settings.auto_exit || self.tab_set.stdin_tab.eof_signaled {
debug!("GetLine: EOF (auto mode or user signaled)");
self.send(ClientMessage::GetLineEof);
self.tab_set.stdin_tab.eof_signaled = false;
} else {
debug!("GetLine: buffer empty, waiting for user input");
self.pending_getline = true;
self.tab_set.stdin_tab.waiting_for_input = true;
self.tab_set.active_tab = 2; }
}
CoordinatorMessage::GetArgs => {
let args = self.flow_arg_vec();
let msg = ClientMessage::Args(args);
self.send(msg);
}
CoordinatorMessage::Read(file_path) => {
let msg = match File::open(&file_path) {
Ok(mut f) => {
let mut buffer = Vec::new();
match f.read_to_end(&mut buffer) {
Ok(_) => {
self.tab_set
.fileio_tab
.content
.push(format!("READ <-- {file_path}"));
if self.tab_set.active_tab != 4 {
self.tab_set.fileio_tab.unread_count += 1;
}
ClientMessage::FileContents(file_path, buffer)
}
Err(e) => {
let msg = format!("Could not read content from '{file_path}': {e}");
self.error(&msg);
ClientMessage::Error(msg)
}
}
}
Err(e) => {
let msg = format!("Could not open file '{file_path}': {e}");
self.error(&msg);
ClientMessage::Error(msg)
}
};
self.send(msg);
}
CoordinatorMessage::Write(filename, bytes) => {
let msg = match File::create(&filename) {
Ok(mut file) => match file.write_all(bytes.as_slice()) {
Ok(()) => {
self.tab_set
.fileio_tab
.content
.push(format!("WRITE --> {filename}"));
if self.tab_set.active_tab != 4 {
self.tab_set.fileio_tab.unread_count += 1;
}
ClientMessage::Ack
}
Err(e) => {
let msg = format!("Error writing to file: '{filename}': '{e}'");
self.error(&msg);
ClientMessage::Error(msg)
}
},
Err(e) => {
let msg = format!("Error creating file: '{filename}': '{e}'");
self.error(&msg);
ClientMessage::Error(msg)
}
};
self.send(msg);
}
CoordinatorMessage::PixelWrite(
(x_coord, y_coord),
(red, green, blue),
(width, height),
ref name,
) => {
if self.tab_set.images_tab.images.is_empty() {
let data = RgbaImage::new(width, height);
self.tab_set.images_tab.images.insert(
name.clone(),
ImageReference {
width,
height,
data,
},
);
if self.tab_set.active_tab != 3 {
self.tab_set.images_tab.new_activity = true;
}
}
if let Some(ImageReference {
width: _,
height: _,
ref mut data,
}) = &mut self.tab_set.images_tab.images.get_mut(name)
{
data.put_pixel(x_coord, y_coord, Rgba([red, green, blue, 255]));
}
self.send(ClientMessage::Ack);
}
CoordinatorMessage::ImageWrite(grid, ref name) => {
let height = u32::try_from(grid.len()).unwrap_or(0);
let width = grid
.first()
.map_or(0, |row| u32::try_from(row.len()).unwrap_or(0));
let data = self
.tab_set
.images_tab
.images
.entry(name.clone())
.or_insert_with(|| ImageReference {
width,
height,
data: RgbaImage::new(width, height),
});
for (y, row) in grid.iter().enumerate() {
for (x, &val) in row.iter().enumerate() {
let gray = val;
data.data.put_pixel(
u32::try_from(x).unwrap_or(0),
u32::try_from(y).unwrap_or(0),
Rgba([gray, gray, gray, 255]),
);
}
}
if self.tab_set.active_tab != 3 {
self.tab_set.images_tab.new_activity = true;
}
self.send(ClientMessage::Ack);
}
CoordinatorMessage::StdoutEof => {
trace!("StdoutEof received");
self.send(ClientMessage::Ack);
}
CoordinatorMessage::StderrEof => {
trace!("StderrEof received");
self.send(ClientMessage::Ack);
}
CoordinatorMessage::Disconnected(reason) => {
self.coordinator_state = crate::CoordinatorState::Disconnected(reason.clone());
self.running = false;
self.error(&reason);
}
CoordinatorMessage::Invalid => {}
}
Task::none()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod test {
use super::*;
#[test]
fn flow_url_absolute_path() {
let url = FlowrGui::flow_url("/tmp/test.toml").expect("Could not create url");
assert_eq!(url.scheme(), "file");
assert!(url.path().ends_with("/tmp/test.toml"));
}
#[test]
fn flow_url_relative_path() {
let url = FlowrGui::flow_url("test.toml").expect("Could not create url");
assert_eq!(url.scheme(), "file");
assert!(url.path().ends_with("/test.toml"));
}
fn test_gui() -> FlowrGui {
FlowrGui {
submission_settings: SubmissionSettings {
flow_manifest_url: String::new(),
flow_args: String::new(),
max_jobs_text: String::new(),
debug_this_flow: false,
display_metrics: false,
parallel_jobs_limit: None,
},
coordinator_settings: CoordinatorSettings::ClientOnly(0),
ui_settings: UiSettings {
auto_start: false,
auto_exit: false,
},
coordinator_state: CoordinatorState::Disconnected("test".into()),
tab_set: TabSet::new(),
submitted: false,
running: false,
show_modal: false,
modal_content: (String::new(), String::new()),
pending_getline: false,
}
}
#[test]
fn max_jobs_valid_number() {
let mut gui = test_gui();
drop(gui.update(Message::MaxJobsChanged("4".into())));
assert_eq!(gui.submission_settings.parallel_jobs_limit, Some(4));
assert_eq!(gui.submission_settings.max_jobs_text, "4");
}
#[test]
fn max_jobs_empty_clears() {
let mut gui = test_gui();
drop(gui.update(Message::MaxJobsChanged("4".into())));
drop(gui.update(Message::MaxJobsChanged(String::new())));
assert_eq!(gui.submission_settings.parallel_jobs_limit, None);
assert_eq!(gui.submission_settings.max_jobs_text, "");
}
#[test]
fn max_jobs_invalid_sets_none() {
let mut gui = test_gui();
drop(gui.update(Message::MaxJobsChanged("abc".into())));
assert_eq!(gui.submission_settings.parallel_jobs_limit, None);
assert_eq!(gui.submission_settings.max_jobs_text, "abc");
}
#[test]
fn debug_submit_without_coordinator_is_noop() {
let mut gui = test_gui();
assert!(!gui.submission_settings.debug_this_flow);
drop(gui.update(Message::DebugSubmitFlow));
assert!(!gui.submitted);
}
#[test]
fn submit_error_shows_modal() {
let mut gui = test_gui();
assert!(!gui.show_modal);
drop(gui.update(Message::SubmitError("test error".into())));
assert!(gui.show_modal);
assert_eq!(gui.modal_content.0, "Error");
assert_eq!(gui.modal_content.1, "test error");
}
#[test]
fn close_modal_hides_it() {
let mut gui = test_gui();
drop(gui.update(Message::SubmitError("test error".into())));
assert!(gui.show_modal);
drop(gui.update(Message::CloseModal));
assert!(!gui.show_modal);
}
#[test]
fn error_method_shows_modal() {
let mut gui = test_gui();
gui.error("something went wrong");
assert!(gui.show_modal);
assert_eq!(gui.modal_content.0, "Error");
assert_eq!(gui.modal_content.1, "something went wrong");
}
#[test]
fn error_modal_renders_with_ok_button() {
use iced_test::simulator::simulator;
let mut gui = test_gui();
drop(gui.update(Message::SubmitError("bad flow path".into())));
let view = gui.view();
let mut ui = simulator(view);
let found = ui.find("OK");
assert!(found.is_ok(), "OK button should be present in error modal");
}
#[test]
fn save_error_shows_modal() {
let mut gui = test_gui();
assert!(!gui.show_modal);
drop(gui.update(Message::SaveError("write failed".into())));
assert!(gui.show_modal);
assert_eq!(gui.modal_content.0, "Error");
assert_eq!(gui.modal_content.1, "write failed");
}
#[test]
fn submitted_sets_flow_name_from_url() {
let mut gui = test_gui();
gui.submission_settings.flow_manifest_url =
"flowr/examples/mandlebrot/manifest.json".into();
drop(gui.update(Message::Submitted));
assert_eq!(gui.tab_set.flow_name, "mandlebrot");
assert!(gui.submitted);
}
#[test]
fn submitted_sets_empty_flow_name_when_no_parent() {
let mut gui = test_gui();
gui.submission_settings.flow_manifest_url = "manifest.json".into();
drop(gui.update(Message::Submitted));
assert!(gui.tab_set.flow_name.is_empty());
}
}