use clap::{ArgMatches, Command};
use crate::databases::redis::{add_redis_command_arguments, new_redis_async};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::process::ExitCode;
use std::sync::{Arc, Mutex};
use tokio::runtime::Builder;
use crate::context::{set_puff_context, set_puff_context_waiting, PuffContext};
use crate::databases::postgres::{add_postgres_command_arguments, new_postgres_async};
use crate::databases::pubsub::{add_pubsub_command_arguments, new_pubsub_async};
use crate::errors::{handle_puff_error, PuffResult, Result};
use crate::graphql::load_schema;
use crate::python::{bootstrap_puff_globals, setup_greenlet};
use crate::runtime::RuntimeConfig;
use crate::types::text::Text;
use crate::types::Puff;
pub use clap;
use tracing::info;
pub mod commands;
pub struct Runnable(Pin<Box<dyn Future<Output = Result<ExitCode>> + 'static>>);
impl Runnable {
pub fn new<F: Future<Output = Result<ExitCode>> + 'static>(inner: F) -> Self {
Self(Box::pin(inner))
}
}
pub trait RunnableCommand: 'static {
fn cli_parser(&self) -> Command;
fn make_runnable(&mut self, args: &ArgMatches, dispatcher: PuffContext) -> Result<Runnable>;
}
struct PackedCommand(Box<dyn RunnableCommand>);
impl PackedCommand {
pub fn cli_parser(&self) -> Command {
self.0.cli_parser()
}
pub fn make_runnable(
&mut self,
args: &ArgMatches,
dispatcher: PuffContext,
) -> Result<Runnable> {
self.0.make_runnable(args, dispatcher)
}
}
fn handle_puff_exit(label: &str, r: PuffResult<ExitCode>) -> ExitCode {
match r {
Ok(exit) => exit,
Err(e) => {
handle_puff_error(label, e);
ExitCode::FAILURE
}
}
}
pub struct Program {
name: Text,
author: Option<Text>,
version: Option<Text>,
about: Option<Text>,
after_help: Option<Text>,
commands: Vec<PackedCommand>,
runtime_config: RuntimeConfig,
}
impl Program {
pub fn new<T: Into<Text>>(name: T) -> Self {
Self {
name: name.into(),
commands: Vec::new(),
version: None,
about: None,
after_help: None,
author: None,
runtime_config: RuntimeConfig::default(),
}
}
pub fn runtime_config(self, runtime_config: RuntimeConfig) -> Self {
let mut s = self;
s.runtime_config = runtime_config;
s
}
pub fn author<T: Into<Text>>(self, author: T) -> Self {
let mut s = self;
s.author = Some(author.into());
s
}
pub fn version<T: Into<Text>>(self, version: T) -> Self {
let mut s = self;
s.version = Some(version.into());
s
}
pub fn about<T: Into<Text>>(self, about: T) -> Self {
let mut s = self;
s.about = Some(about.into());
s
}
pub fn after_help<T: Into<Text>>(self, after_help: T) -> Self {
let mut s = self;
s.after_help = Some(after_help.into());
s
}
pub fn command<C: RunnableCommand>(self, command: C) -> Self {
let box_command = Box::new(command);
let mut new_self = self;
new_self.commands.push(PackedCommand(box_command));
new_self
}
fn clap_command(&self) -> Command {
let mut tl = Command::new(self.name.clone().into_string()).arg_required_else_help(true);
if let Some(author) = &self.author {
tl = tl.author(author.to_string());
}
if let Some(about) = &self.about {
tl = tl.about(about.to_string());
}
if let Some(version) = &self.version {
tl = tl.version(version.to_string());
}
if let Some(after_help) = &self.after_help {
tl = tl.after_help(after_help.to_string());
}
tl
}
fn runtime(&self) -> Result<Builder> {
let current_thread = self.runtime_config.tokio_worker_threads() == 1;
let mut rt = if current_thread {
tokio::runtime::Builder::new_current_thread()
} else {
tokio::runtime::Builder::new_multi_thread()
};
rt.enable_all()
.worker_threads(self.runtime_config.tokio_worker_threads())
.max_blocking_threads(self.runtime_config.max_blocking_threads())
.thread_keep_alive(self.runtime_config.blocking_task_keep_alive());
Ok(rt)
}
pub fn run(self) -> ExitCode {
handle_puff_exit("Program", self.try_run())
}
pub fn try_run(self) -> Result<ExitCode> {
tracing_subscriber::fmt::init();
let mut top_level = self.clap_command();
let rt_config = self.runtime_config.clone();
if rt_config.postgres() {
top_level = add_postgres_command_arguments(top_level)
}
if rt_config.redis() {
top_level = add_redis_command_arguments(top_level)
}
if rt_config.pubsub() {
top_level = add_pubsub_command_arguments(top_level)
}
rt_config.apply_env_vars();
let mutex_switcher = Arc::new(Mutex::new(None::<PuffContext>));
let python_dispatcher = if rt_config.python() {
pyo3::prepare_freethreaded_python();
bootstrap_puff_globals(rt_config.clone())?;
let dispatcher = setup_greenlet(rt_config.clone(), mutex_switcher.clone())?;
Some(dispatcher)
} else {
None
};
let mut builder = self.runtime()?;
let mut hm: HashMap<Text, PackedCommand> = HashMap::with_capacity(self.commands.len());
for packed_command in self.commands {
let parser = packed_command.cli_parser();
let name = parser.get_name().to_owned();
top_level = top_level.subcommand(parser);
hm.insert(name.into(), packed_command);
}
let arg_matches = top_level.get_matches();
if let Some((command, args)) = arg_matches.subcommand() {
if let Some(mut runner) = hm.remove(&command.to_string().into()) {
let thread_mutex = mutex_switcher.clone();
builder.on_thread_start(move || {
set_puff_context_waiting(thread_mutex.clone());
});
let rt = builder.build()?;
let mut redis = None;
if rt_config.redis() {
redis = Some(rt.block_on(new_redis_async(
arg_matches.get_one::<String>("redis_url").unwrap().as_str(),
true,
rt_config.redis_pool_size()
))?);
}
let mut pubsub_client = None;
if rt_config.pubsub() {
pubsub_client = Some(
rt.block_on(new_pubsub_async(
arg_matches
.get_one::<String>("pubsub_url")
.unwrap()
.as_str(),
true,
))?,
);
}
let mut postgres = None;
if rt_config.postgres() {
postgres = Some(
rt.block_on(new_postgres_async(
arg_matches
.get_one::<String>("postgres_url")
.unwrap()
.as_str(),
true,
rt_config.postgres_pool_size()
))?,
);
}
let mut gql_root = None;
if let Some(mod_path) = rt_config.gql_module() {
let res = rt.block_on(load_schema(
mod_path,
python_dispatcher.clone().expect("GQL needs Python"),
))?;
gql_root = Some(res);
}
let context = PuffContext::new_with_options(
rt.handle().clone(),
redis,
postgres,
python_dispatcher,
pubsub_client.clone(),
gql_root.clone(),
);
set_puff_context(context.puff());
pubsub_client.map(|c| c.start_supervised_listener());
let context_to_set = context.puff();
{
let mut borrowed = mutex_switcher.lock().unwrap();
*borrowed = Some(context_to_set);
}
let runnable = runner.make_runnable(args, context)?;
let main_fut = async {
let shutdown = tokio::signal::ctrl_c();
tokio::select! {
res = runnable.0 => {
return res
}
_ = shutdown => {
info!("shutting down");
return Ok(ExitCode::SUCCESS)
}
}
};
return rt.block_on(main_fut);
}
}
Ok(ExitCode::SUCCESS)
}
}