#![warn(clippy::unwrap_used)]
#![deny(missing_docs)]
use core::str::FromStr;
use std::{env, process, thread};
use std::fs::File;
use std::io::prelude::*;
use std::path::PathBuf;
use clap::{Arg, ArgMatches};
use clap::Command as ClapCommand;
use env_logger::Builder;
use iced::{Alignment, Application, Command, Element, Length, Settings, Subscription, Theme};
use iced::alignment::Horizontal;
use iced::executor;
use iced::widget::{Button, Column, Row, scrollable, Text, text_input};
use iced::widget::scrollable::Id;
use iced_aw::{Card, modal};
use image::{ImageBuffer, Rgba, RgbaImage};
use log::{info, LevelFilter, warn};
use simpath::Simpath;
use url::Url;
use flowcore::meta_provider::MetaProvider;
use flowcore::model::flow_manifest::FlowManifest;
use flowcore::model::submission::Submission;
use flowcore::provider::Provider;
use flowcore::url_helper::url_from_string;
use flowrlib::info as flowrlib_info;
use gui::coordinator_connection::CoordinatorConnection;
use gui::debug_message::DebugServerMessage;
use gui::debug_message::DebugServerMessage::*;
use crate::gui::client_message::ClientMessage;
use crate::gui::coordinator_message::CoordinatorMessage;
use crate::tabs::TabSet;
mod context;
mod gui;
mod connection_manager;
mod tabs;
mod errors;
#[derive(Debug, Clone)]
pub enum Message {
CoordinatorDisconnected(String),
CoordinatorSent(CoordinatorMessage),
SubmitFlow, Submitted,
UrlChanged(String),
FlowArgsChanged(String),
TabSelected(usize),
NewStdin(String),
LineOfStdin(String),
StdioAutoScrollTogglerChanged(Id, bool),
CloseModal,
}
enum CoordinatorState {
Disconnected(String),
Connected(tokio::sync::mpsc::Sender<ClientMessage>),
}
fn main() -> iced::Result {
FlowrGui::run(Settings {
antialiasing: true,
..Settings::default()
})
}
#[derive(Clone)]
struct SubmissionSettings {
flow_manifest_url: String,
flow_args: String,
debug_this_flow: bool,
display_metrics: bool,
parallel_jobs_limit: Option<usize>, }
#[derive(Clone)]
pub struct ServerSettings {
native_flowstdlib: bool,
num_threads: usize,
lib_search_path: Simpath,
}
#[derive(Clone)]
pub enum CoordinatorSettings {
Server(ServerSettings),
ClientOnly(u16),
}
struct UiSettings {
auto: bool,
}
struct ImageReference {
pub width: u32,
pub height: u32,
pub data: ImageBuffer<Rgba<u8>, Vec<u8>>,
}
struct FlowrGui {
submission_settings: SubmissionSettings,
coordinator_settings: CoordinatorSettings,
ui_settings: UiSettings,
coordinator_state: CoordinatorState,
tab_set: TabSet,
running: bool,
submitted: bool,
show_modal: bool,
modal_content: (String, String),
}
impl Application for FlowrGui {
type Executor = executor::Default;
type Message = Message;
type Theme = Theme;
type Flags = ();
fn new(_flags: ()) -> (Self, Command<Message>) {
let settings = FlowrGui::initial_settings();
let flowrgui = FlowrGui {
submission_settings: settings.0,
coordinator_settings: settings.1,
ui_settings: settings.2,
coordinator_state: CoordinatorState::Disconnected("Starting".into()),
tab_set: TabSet::new(),
submitted: false,
running: false,
show_modal: false,
modal_content: ("".to_owned(), "".to_owned()),
};
(flowrgui, Command::none())
}
fn title(&self) -> String {
String::from("flowrgui")
}
fn update(&mut self, message: Message) -> Command<Message> {
match message {
Message::CoordinatorSent(CoordinatorMessage::Connected(sender)) => {
self.coordinator_state = CoordinatorState::Connected(sender);
if self.ui_settings.auto {
return Command::perform(Self::auto_submit(), |_| Message::SubmitFlow);
}
},
Message::SubmitFlow => {
if let CoordinatorState::Connected(sender) = &self.coordinator_state {
return Command::perform(Self::submit(sender.clone(),
self.submission_settings.clone()), |_| Message::Submitted);
}
},
Message::Submitted => {
self.tab_set.clear();
self.submitted = true
},
Message::FlowArgsChanged(value) => self.submission_settings.flow_args = value,
Message::UrlChanged(value) => self.submission_settings.flow_manifest_url = value,
Message::TabSelected(_) => return self.tab_set.update(message),
Message::StdioAutoScrollTogglerChanged(_, _) => return self.tab_set.update(message),
Message::CoordinatorSent(coord_msg) =>
return self.process_coordinator_message(coord_msg),
Message::CloseModal => self.show_modal = false,
Message::CoordinatorDisconnected(reason) => {
self.coordinator_state = CoordinatorState::Disconnected(reason)
},
Message::NewStdin(text) => self.tab_set.stdin_tab.text_entered(text),
Message::LineOfStdin(line) => self.tab_set.stdin_tab.new_line(line),
}
Command::none()
}
fn view(&self) -> Element<'_, Message> {
let main = Column::new().spacing(10)
.push(self.command_row())
.push(self.tab_set.view())
.push(self.status_row())
.padding(10);
let overlay = if self.show_modal {
Some(Card::new(
Text::new(self.modal_content.clone().0),
Text::new(self.modal_content.clone().1),
)
.foot(
Row::new()
.spacing(10)
.padding(5)
.width(Length::Fill)
.push(
Button::new(Text::new("OK")
.horizontal_alignment(Horizontal::Center))
.width(Length::Fill)
.on_press(Message::CloseModal)),
)
.max_width(300.0))
} else {
None
};
modal(main, overlay)
.backdrop(Message::CloseModal)
.on_esc(Message::CloseModal)
.into()
}
fn subscription(&self) -> Subscription<Message> {
connection_manager::subscribe(self.coordinator_settings.clone())
.map(Message::CoordinatorSent)
}
}
impl FlowrGui {
async fn auto_submit() {
info!("Auto submitting flow");
}
async fn submit(sender: tokio::sync::mpsc::Sender<ClientMessage>,
settings: SubmissionSettings) {
match Self::flow_url(settings.flow_manifest_url) {
Ok(url) => {
let provider = &MetaProvider::new(Simpath::new(""),
PathBuf::default()) as &dyn Provider;
match FlowManifest::load(provider, &url) {
Ok((flow_manifest, _)) => {
let submission = Submission::new(
flow_manifest,
settings.parallel_jobs_limit,
None, settings.debug_this_flow,
);
info!("Sending submission to Coordinator");
match sender.send(ClientMessage::ClientSubmission(submission)).await {
Ok(_) => {
}
Err(_) => {
}
}
}
Err(_e) => {
}
}
},
Err(_e) => {
}
}
}
fn error<S>(&mut self, _msg: S) where S: Into<String> {
}
fn info(&mut self, _msg: String) {
}
fn command_row(&self) -> Row<Message> {
let url = text_input("Flow location (relative, or absolute)",
&self.submission_settings.flow_manifest_url)
.on_input(Message::UrlChanged);
let args = text_input("Space separated flow arguments",
&self.submission_settings.flow_args)
.on_submit(Message::SubmitFlow)
.on_input(Message::FlowArgsChanged)
.on_paste(Message::FlowArgsChanged);
let mut play = Button::new("Play");
if matches!(self.coordinator_state, CoordinatorState::Connected(_)) && !self.running && !self.submitted {
play = play.on_press(Message::SubmitFlow);
}
Row::new()
.spacing(10)
.align_items(Alignment::End)
.push(url)
.push(args)
.push(play)
}
fn status_row(&self) -> Row<Message> {
let status = match &self.coordinator_state {
CoordinatorState::Disconnected(reason) => format!("Disconnected({reason})"),
CoordinatorState::Connected(_) => {
let msg = match (self.submitted, self.running) {
(false, false) => "Ready",
(_, true) => "Running",
(true, false) => "Submitted",
};
format!("Connected({})", msg)
},
};
Row::new()
.push(Text::new(format!("Coordinator: {}", status)))
}
fn initial_settings() -> (SubmissionSettings, CoordinatorSettings, UiSettings) {
let matches = Self::parse_cli_args();
let default = String::from("error");
let verbosity = matches.get_one::<String>("verbosity").unwrap_or(&default);
let level = LevelFilter::from_str(verbosity).unwrap_or(LevelFilter::Error);
let mut builder = Builder::from_default_env();
builder.filter_level(level).init();
info!("'{}' version {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
info!("'flowrlib' version {}", flowrlib_info::version());
let flow_manifest_url = matches.get_one::<String>("flow-manifest")
.unwrap_or(&"".into()).to_string();
let flow_args = match matches.get_many::<String>("flow-args") {
Some(values) => {
println!("values {:?}", values);
values.map(|s| s.to_string())
.collect::<Vec<String>>().join(" ")
},
None => String::new()
};
let parallel_jobs_limit = matches.get_one::<usize>("jobs").map(|i| i.to_owned());
let debug_this_flow = matches.get_flag("debugger");
let coordinator_settings = match matches.get_one::<u16>("client") {
Some(port) => CoordinatorSettings::ClientOnly(*port),
None => {
let lib_dirs = if matches.contains_id("lib_dir") {
if let Some(dirs) = matches.get_many::<String>("lib_dir") {
dirs.map(|s| s.to_string()).collect()
} else {
vec![]
}
} else {
vec![]
};
let lib_search_path = FlowrGui::lib_search_path(&lib_dirs);
let native_flowstdlib = matches.get_flag("native");
let num_threads = FlowrGui::num_threads(&matches);
let server_settings = ServerSettings {
num_threads,
native_flowstdlib,
lib_search_path,
};
CoordinatorSettings::Server(server_settings) },
};
(SubmissionSettings {
flow_manifest_url,
flow_args,
debug_this_flow,
display_metrics: matches.get_flag("metrics"),
parallel_jobs_limit,
},
coordinator_settings,
UiSettings {
auto: matches.get_flag("auto")
}
)
}
fn parse_cli_args() -> ArgMatches {
let app = ClapCommand::new(env!("CARGO_PKG_NAME"))
.version(env!("CARGO_PKG_VERSION"));
let app = app.arg(
Arg::new("debugger")
.short('d')
.long("debugger")
.action(clap::ArgAction::SetTrue)
.help("Enable the debugger when running a flow"),
);
#[cfg(not(feature = "wasm"))]
let app = app.arg(
Arg::new("native")
.short('n')
.long("native")
.action(clap::ArgAction::SetTrue)
.help("Link with native (not WASM) version of flowstdlib"),
);
let app = app.arg(
Arg::new("client")
.short('c')
.long("client")
.number_of_values(1)
.value_parser(clap::value_parser!(u16))
.help("Launch only a client (no coordinator) to connect to a remote coordinator")
);
let app = app.arg(
Arg::new("metrics")
.short('m')
.long("metrics")
.action(clap::ArgAction::SetTrue)
.help("Calculate metrics during flow execution and print them out when done")
);
let app = app.arg(
Arg::new("auto")
.short('a')
.long("auto")
.action(clap::ArgAction::SetTrue)
.help("Run any flow specified automatically on start-up. Exit automatically.")
);
let app = app
.arg(Arg::new("jobs")
.short('j')
.long("jobs")
.number_of_values(1)
.value_parser(clap::value_parser!(usize))
.value_name("MAX_JOBS")
.help("Set maximum number of jobs that can be running in parallel)"))
.arg(Arg::new("lib_dir")
.short('L')
.long("libdir")
.num_args(0..)
.number_of_values(1)
.value_name("LIB_DIR|BASE_URL")
.help("Add a directory or base Url to the Library Search path"))
.arg(Arg::new("threads")
.short('t')
.long("threads")
.number_of_values(1)
.value_parser(clap::value_parser!(usize))
.value_name("THREADS")
.help("Set number of threads to use to execute jobs (min: 1, default: cores available)"))
.arg(Arg::new("verbosity")
.short('v')
.long("verbosity")
.number_of_values(1)
.value_name("VERBOSITY_LEVEL")
.help("Set verbosity level for output (trace, debug, info, warn, default: error)"))
.arg(Arg::new("flow-manifest")
.num_args(1)
.help("the file path of the 'flow' manifest file"))
.arg(Arg::new("flow-args")
.num_args(0..)
.trailing_var_arg(true)
.help("A list of arguments to pass to the flow."));
app.get_matches()
}
fn flow_url(flow_url_string: String) -> flowcore::errors::Result<Url> {
let cwd_url = Url::from_directory_path(env::current_dir()?)
.map_err(|_| "Could not form a Url for the current working directory")?;
url_from_string(&cwd_url, Some(&flow_url_string))
}
fn flow_arg_vec(&self) -> Vec<String> {
let mut flow_args: Vec<String> = vec![self.submission_settings.flow_manifest_url.clone()];
let additional_args : Vec<String> = self.submission_settings.flow_args.split(' ')
.map(|s| s.to_string()).collect();
flow_args.extend(additional_args);
flow_args
}
fn lib_search_path(search_path_additions: &[String]) -> Simpath {
let mut lib_search_path = Simpath::new_with_separator("FLOW_LIB_PATH",
',');
for additions in search_path_additions {
lib_search_path.add(additions);
info!("'{}' added to the Library Search Path", additions);
}
if lib_search_path.is_empty() {
let home_dir = env::var("HOME")
.unwrap_or_else(|_| "Could not get $HOME".to_string());
lib_search_path.add(&format!("{}/.flow/lib", home_dir))
}
lib_search_path
}
fn num_threads(matches: &ArgMatches) -> usize {
match matches.get_one::<usize>("threads") {
Some(num_threads) => *num_threads,
None => thread::available_parallelism().map(|n| n.get()).unwrap_or(1)
}
}
fn send(&mut self, msg: ClientMessage) {
if let CoordinatorState::Connected(ref sender) = self.coordinator_state {
let _ = sender.try_send(msg);
}
}
fn process_coordinator_message(&mut self, message: CoordinatorMessage) -> Command<Message> {
match message {
CoordinatorMessage::Connected(_) => {
self.error("Coordinator is already connected");
},
CoordinatorMessage::FlowStart => {
self.running = true;
self.submitted = false;
self.send(ClientMessage::Ack);
},
CoordinatorMessage::FlowEnd(metrics) => {
self.running = false;
if self.submission_settings.display_metrics {
self.show_modal = true;
self.modal_content = ("Flow Ended - Metrics".into(),
format!("{}", metrics));
}
if self.ui_settings.auto {
self.info("Auto exiting on flow completion".into());
process::exit(0);
}
}
CoordinatorMessage::CoordinatorExiting(_) => {
self.coordinator_state = CoordinatorState::Disconnected("Exited".into());
self.send(ClientMessage::Ack);
},
CoordinatorMessage::Stdout(string) => {
self.tab_set.stdout_tab.content.push(string);
self.send(ClientMessage::Ack);
if self.tab_set.stdout_tab.auto_scroll {
return scrollable::snap_to(
self.tab_set.stdout_tab.id.clone(), scrollable::RelativeOffset::END);
}
},
CoordinatorMessage::Stderr(string) => {
self.tab_set.stderr_tab.content.push(string);
self.send(ClientMessage::Ack);
if self.tab_set.stderr_tab.auto_scroll {
return scrollable::snap_to(
self.tab_set.stderr_tab.id.clone(), scrollable::RelativeOffset::END);
}
},
CoordinatorMessage::GetStdin => {
let msg = match self.tab_set.stdin_tab.get_all() {
Some(buf) => ClientMessage::Stdin(buf),
None => ClientMessage::GetLineEof,
};
self.send(msg);
}
CoordinatorMessage::GetLine(prompt) => {
let msg = match self.tab_set.stdin_tab.get_line(prompt) {
Some(line) => ClientMessage::Line(line),
None => ClientMessage::GetLineEof,
};
self.send(msg);
}
CoordinatorMessage::GetArgs => {
let args = self.flow_arg_vec();
let msg = ClientMessage::Args(args);
self.send(msg);
}
CoordinatorMessage::Read(file_path) => {
let msg = match File::open(&file_path) {
Ok(mut f) => {
let mut buffer = Vec::new();
match f.read_to_end(&mut buffer) {
Ok(_) => {
self.tab_set.fileio_tab.content.push(
format!("READ <-- {}", file_path));
ClientMessage::FileContents(file_path, buffer)
},
Err(_) => ClientMessage::Error(format!(
"Could not read content from '{file_path:?}'"
)),
}
}
Err(_) => ClientMessage::Error(format!("Could not open file '{file_path:?}'")),
};
self.send(msg);
},
CoordinatorMessage::Write(filename, bytes) => {
let msg = match File::create(&filename) {
Ok(mut file) => match file.write_all(bytes.as_slice()) {
Ok(_) => {
self.tab_set.fileio_tab.content.push(
format!("WRITE --> {}", filename));
ClientMessage::Ack
},
Err(e) => {
let msg = format!("Error writing to file: '{filename}': '{e}'");
self.error("{msg}");
ClientMessage::Error(msg)
}
},
Err(e) => {
let msg = format!("Error creating file: '{filename}': '{e}'");
self.error("{msg}");
ClientMessage::Error(msg)
}
};
self.send(msg);
},
CoordinatorMessage::PixelWrite((x, y), (r, g, b), (width, height), ref name) => {
if self.tab_set.images_tab.images.is_empty() {
let data = RgbaImage::new(width, height);
self.tab_set.images_tab.images.insert(name.clone(), ImageReference {width, height, data });
}
if let Some(ImageReference{width: _, height: _, ref mut data})
= &mut self.tab_set.images_tab.images.get_mut(name) {
data.put_pixel(x, y, Rgba([r, g, b, 255]));
}
self.send(ClientMessage::Ack);
}
_ => {},
}
Command::none()
}
}