use std::ffi::OsStr;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use std::{error::Error, path::Path};
use anyhow::{Context, Result, bail};
use clap::Parser;
use flexi_logger::LogSpecification;
use parking_lot::Mutex;
use sysinfo::{Pid, ProcessStatus, System};
use termusiclib::config::v2::server::config_extra::ServerConfigVersionedDefaulted;
use termusiclib::config::v2::server::{ComProtocol, ScanDepth};
use termusiclib::config::v2::tui::config_extra::TuiConfigVersionedDefaulted;
use termusiclib::config::{
ServerOverlay, SharedServerSettings, SharedTuiSettings, TuiOverlay, new_shared_server_settings,
new_shared_tui_settings,
};
use termusiclib::player::music_player_client::MusicPlayerClient;
use termusiclib::{podcast, utils};
use tokio::io::AsyncReadExt;
use tokio::process::{Child, Command};
use tokio::sync::RwLock;
use tokio::task::AbortHandle;
use tokio_util::sync::CancellationToken;
use ui::UI;
mod cli;
mod logger;
mod ui;
#[macro_use]
extern crate log;
pub const MAX_DEPTH: usize = 4;
#[derive(Debug, Clone)]
pub struct CombinedSettings {
pub server: SharedServerSettings,
pub tui: SharedTuiSettings,
}
fn main() -> Result<()> {
let runtime =
tokio::runtime::Runtime::new().expect("Expected the Tokio Runtime to start correctly");
let res = runtime.block_on(actual_main());
trace!("Shutting down the runtime with a timeout");
let before = Instant::now();
runtime.shutdown_timeout(Duration::from_secs(5));
trace!("Tokio Exited after {:#?}", before.elapsed());
if let Err(err) = res {
error!("Error: {err:?}");
return Err(err);
}
Ok(())
}
pub static SERVER_PID: OnceLock<Pid> = OnceLock::new();
async fn actual_main() -> Result<()> {
let args = cli::Args::parse();
let mut logger_handle = logger::setup(&args);
let config = get_config(&args)?;
ctrl_c_handler().expect("Error setting Ctrl-C handler");
if let Some(action) = args.action {
return execute_action(action, &config).await;
}
let (pid, child) = {
let active_pid = find_active_server_process();
if let Some(pid) = active_pid {
(pid.as_u32(), None)
} else {
let child = launch_server(&args)?;
(child.id().unwrap(), Some(child))
}
};
let server_output = child.map(collect_server_output);
println!("Server process ID: {pid}");
SERVER_PID
.set(Pid::from_u32(pid))
.unwrap_or_else(|_| error!("Could not set SERVER_PID."));
if !args.log_options.log_to_file {
logger_handle.set_new_spec(LogSpecification::off());
} else if let Err(err) =
logger_handle.adapt_duplication_to_stderr(flexi_logger::Duplicate::None)
{
warn!("flexi_logger error: {err}");
}
info!("Waiting until connected");
let (client, addr) = match wait_till_connected(&config, pid).await {
Ok(v) => v,
Err(err) => {
if let Some(server_output) = server_output {
server_output.cancel_token.cancel();
let stdout = server_output.stdout.read().await;
let stderr = server_output.stderr.read().await;
let stdout = String::from_utf8_lossy(&stdout).to_string();
let stderr = String::from_utf8_lossy(&stderr).to_string();
return Err(err.context(format!(
"Server output during start:\n---STDOUT---\n{stdout}\n---STDERR---\n{stderr}\n---"
)));
}
return Err(err);
}
};
info!("Connected on {addr}");
if let Some(server_output) = server_output {
server_output.cancel_token.cancel();
}
let mut ui = UI::new(config, client).await?;
ui.run()?;
info!("Bye");
Ok(())
}
#[derive(Debug)]
struct ServerOutput {
stdout: RwLock<Vec<u8>>,
stderr: RwLock<Vec<u8>>,
cancel_token: CancellationToken,
}
fn collect_server_output(mut child: Child) -> Arc<ServerOutput> {
let output = Arc::new(ServerOutput {
stdout: RwLock::new(Vec::new()),
stderr: RwLock::new(Vec::new()),
cancel_token: CancellationToken::new(),
});
let res = output.clone();
let (Some(mut stdout), Some(mut stderr)) = (child.stdout.take(), child.stderr.take()) else {
warn!("Somehow spawned server stdout or stderr are not available!");
return output;
};
tokio::spawn(async move {
let mut handle_stdout = output.stdout.write().await;
let mut handle_stderr = output.stderr.write().await;
let cancel_token = output.cancel_token.clone();
loop {
tokio::select! {
_ = stdout.read_buf(&mut *handle_stdout) => {
},
_ = stderr.read_buf(&mut *handle_stderr) => {
}
() = cancel_token.cancelled() => {
break;
}
}
}
debug!("Server log collection task ended");
});
res
}
fn launch_server(args: &cli::Args) -> Result<Child> {
let termusic_server_prog = get_server_binary_exe()?;
let mut server_args = vec![];
if args.log_options.file_color_log {
server_args.push("--log-filecolor");
}
if let Some(backend) = args.backend {
server_args.push("--backend");
server_args.push(backend.as_str());
}
#[allow(clippy::zombie_processes)]
let proc = spawn_process(&termusic_server_prog, true, &server_args).context(format!(
"Could not start binary \"{}\"",
termusic_server_prog.display()
))?;
Ok(proc)
}
fn spawn_process<A: IntoIterator<Item = S> + Clone, S: AsRef<OsStr>>(
prog: &Path,
piped: bool,
args: A,
) -> std::io::Result<Child> {
let mut cmd = Command::new(prog);
cmd.stdin(Stdio::null());
if piped {
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
} else {
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
}
#[cfg(windows)]
{
use windows::Win32::System::Threading::DETACHED_PROCESS;
cmd.creation_flags(DETACHED_PROCESS.0);
}
cmd.args(args);
cmd.spawn()
}
fn find_active_server_process() -> Option<Pid> {
let mut system = System::new();
system.refresh_all();
for (id, proc) in system.processes() {
let Some(exe) = proc.exe().map(|v| v.display().to_string()) else {
continue;
};
if exe.contains("termusic-server") {
return Some(*id);
}
}
None
}
fn get_server_binary_exe() -> Result<PathBuf> {
let mut termusic_server_prog = std::path::PathBuf::from("termusic-server");
let potential_server_exe = {
let mut exe = std::env::current_exe()?;
exe.pop();
exe.join(&termusic_server_prog)
};
if potential_server_exe.exists() {
termusic_server_prog = potential_server_exe;
}
Ok(termusic_server_prog)
}
const WAIT_TIMEOUT: Duration = Duration::from_secs(30);
const WAIT_MESSAGE_TIME: Duration = Duration::from_secs(5);
const WAIT_INTERVAL: Duration = Duration::from_millis(100);
async fn wait_till_connected(
config: &CombinedSettings,
pid: u32,
) -> Result<(MusicPlayerClient<tonic::transport::Channel>, String)> {
let protocol = config.tui.read().settings.get_com().unwrap().protocol;
let player = match protocol {
ComProtocol::HTTP => wait_till_connected_tcp(config, pid).await?,
ComProtocol::UDS => wait_till_connected_uds(config, pid).await?,
};
Ok(player)
}
async fn wait_till_connected_tcp(
config: &CombinedSettings,
pid: u32,
) -> Result<(MusicPlayerClient<tonic::transport::Channel>, String)> {
let addr = {
let config_read = config.tui.read();
SocketAddr::from(config_read.settings.get_com().ok_or(anyhow::anyhow!(
"Expected tui-com settings to be resolved at this point"
))?)
};
let addr = format!("http://{addr}");
let mut sys = sysinfo::System::new();
let sys_pid = Pid::from_u32(pid);
let start_time = Instant::now();
let _msg_handle = start_message_timeout();
loop {
if Instant::now() > start_time + WAIT_TIMEOUT {
anyhow::bail!(
"Could not connect within {} timeout.",
WAIT_TIMEOUT.as_secs()
);
}
sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[sys_pid]), true);
let status = sys.process(sys_pid);
if status.is_none_or(|v| v.status() == ProcessStatus::Zombie) {
anyhow::bail!("Process {pid} exited before being able to connect!");
}
match MusicPlayerClient::connect(addr.clone()).await {
Err(err) => {
if let Some(os_err) = find_source::<std::io::Error>(&err)
&& os_err.kind() == std::io::ErrorKind::ConnectionRefused
{
debug!("Connection refused found!");
tokio::time::sleep(WAIT_INTERVAL).await;
continue;
}
anyhow::bail!(err);
}
Ok(client) => return Ok((client, addr)),
}
}
}
async fn wait_till_connected_uds(
config: &CombinedSettings,
pid: u32,
) -> Result<(MusicPlayerClient<tonic::transport::Channel>, String)> {
let addr = {
let config_read = config.tui.read();
let addr = config_read
.settings
.get_com()
.unwrap()
.socket_path
.to_string_lossy();
format!("unix://{addr}")
};
let mut sys = sysinfo::System::new();
let sys_pid = Pid::from_u32(pid);
let start_time = Instant::now();
let _msg_handle = start_message_timeout();
loop {
if Instant::now() > start_time + WAIT_TIMEOUT {
anyhow::bail!(
"Could not connect within {} timeout.",
WAIT_TIMEOUT.as_secs()
);
}
sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[sys_pid]), true);
let status = sys.process(sys_pid);
if status.is_none_or(|v| v.status() == ProcessStatus::Zombie) {
anyhow::bail!("Process {pid} exited before being able to connect!");
}
match MusicPlayerClient::connect(addr.clone()).await {
Err(err) => {
if let Some(os_err) = find_source::<std::io::Error>(&err) {
if os_err.kind() == std::io::ErrorKind::ConnectionRefused {
debug!("Connection refused found!");
tokio::time::sleep(WAIT_INTERVAL).await;
continue;
}
if os_err.kind() == std::io::ErrorKind::NotFound {
debug!("Socket File not found!");
tokio::time::sleep(WAIT_INTERVAL).await;
continue;
}
}
return Err(anyhow::anyhow!(err).context(addr));
}
Ok(client) => return Ok((client, addr)),
}
}
}
fn start_message_timeout() -> AbortOnDropHandle {
let handle = tokio::spawn(async {
tokio::time::sleep(WAIT_MESSAGE_TIME).await;
eprintln!("Connecting is taking more time than expected...");
});
AbortOnDropHandle(handle.abort_handle())
}
#[derive(Debug)]
struct AbortOnDropHandle(AbortHandle);
impl Drop for AbortOnDropHandle {
fn drop(&mut self) {
self.0.abort();
}
}
fn find_source<E: Error + 'static>(err: &dyn Error) -> Option<&E> {
let mut err = err.source();
while let Some(cause) = err {
if let Some(typed) = cause.downcast_ref() {
return Some(typed);
}
err = cause.source();
}
None
}
fn get_config(args: &cli::Args) -> Result<CombinedSettings> {
let config_server = ServerConfigVersionedDefaulted::from_config_path()?.into_settings();
let max_depth = args.max_depth.map(ScanDepth::Limited);
let music_dir = if let Some(ref dir) = args.music_directory {
Some(get_path(dir).context("resolving cli music-dir")?)
} else {
None
};
let overlay_server = ServerOverlay {
settings: config_server,
music_dir_overwrite: music_dir,
disable_discord_status: args.disable_discord,
metadata_scan_depth: max_depth,
};
let config_tui = TuiConfigVersionedDefaulted::from_config_path()?.into_settings();
let coverart_hidden_overwrite = if args.hide_cover { Some(true) } else { None };
let overlay_tui = TuiOverlay {
settings: config_tui,
coverart_hidden_overwrite,
cover_features: !args.disable_cover,
};
Ok(CombinedSettings {
server: new_shared_server_settings(overlay_server),
tui: new_shared_tui_settings(overlay_tui),
})
}
fn get_path(dir: &Path) -> Result<PathBuf> {
let mut path = dir.to_path_buf();
if path.exists() {
if !path.has_root()
&& let Ok(p_base) = std::env::current_dir()
{
path = p_base.join(path);
}
if let Ok(p_canonical) = path.canonicalize() {
path = p_canonical;
}
return Ok(path);
}
bail!("Error: non-existing directory '{}'", dir.display());
}
async fn execute_action(action: cli::Action, config: &CombinedSettings) -> Result<()> {
match action {
cli::Action::Import { file } => {
println!("need to import from file {}", file.display());
let path = get_path(&file).context("import cli file-path")?;
let config_dir_path =
utils::get_app_config_path().context("getting app-config-path")?;
let config_c = config.server.read().settings.podcast.clone();
podcast::import_from_opml(&config_dir_path, &config_c, &path)
.await
.context("import opml")?;
}
cli::Action::Export { file } => {
println!("need to export to file {}", file.display());
let path = utils::absolute_path(&file)?;
let config_dir_path =
utils::get_app_config_path().context("getting app-config-path")?;
podcast::export_to_opml(&config_dir_path, &path).context("export opml")?;
}
}
Ok(())
}
static TERMINAL_ALTERNATE_MODE: AtomicBool = AtomicBool::new(false);
fn ctrl_c_handler() -> Result<()> {
let last_ctrlc = Mutex::new(None);
ctrlc::set_handler(move || {
warn!("CTRL+C handler invoked! TUI key-handling not started or overwritten?");
let mut lock = last_ctrlc.lock();
let Some(val) = lock.as_ref() else {
*lock = Some(Instant::now());
return;
};
if val.elapsed() > Duration::from_secs(2) {
*lock = Some(Instant::now());
return;
}
error!("Exiting because of CTRL+C!");
if TERMINAL_ALTERNATE_MODE.load(Ordering::SeqCst) {
ui::model::Model::hook_reset_terminal();
}
std::process::exit(-1);
})?;
Ok(())
}