// copyright (C) 2022-2023 Nissa <and-nissa@protonmail.com>
// licensed under GPL-3.0-or-later
//! Defines a runtime block.
use tokio::select;
use tokio::signal;
use tokio::signal::unix::SignalKind;
use tokio::sync::{mpsc, Notify, RwLock, RwLockReadGuard};
use tokio::task::{self, JoinHandle};
use tokio::time::{self, Instant};
use tracing::{error, span, trace, warn, Instrument, Level};
use core::str::{self, FromStr};
use core::time::Duration;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Arc;
use crate::config::TomlBlock;
use crate::protocol::Body;
/// Configured block at runtime, which communicates to a parent
/// [bar](crate::bar::Smolbar).
#[must_use]
#[derive(Debug)]
pub struct Block {
body: Arc<RwLock<Body>>,
// `cmd` is responsible for sending refresh msgs to the bar. it continues as
// long as it receives `true`, then it halts. `cmd` expects `bar_refresh` to
// be alive.
cmd: (mpsc::Sender<bool>, JoinHandle<()>),
// `interval` sends a refresh to `cmd` on an interval. if it receives a
// notification, it halts.
interval: (Arc<Notify>, JoinHandle<()>),
// `signal` sends a refresh to `cmd` any time it receives a certain os
// signal. if it receives a notification, it halts.
signal: (Arc<Notify>, JoinHandle<()>),
}
impl Block {
/// Initializes a new [`Block`].
///
/// A `Block` expects that sending `true` through `bar_refresh` will trigger
/// a refresh of all blocks, and sending `false` will halt that loop.
#[allow(clippy::missing_panics_doc)]
pub async fn new(
toml: TomlBlock,
global: Body,
bar_refresh: mpsc::Sender<bool>,
cmd_dir: PathBuf,
id: usize,
) -> Self {
let toml = Arc::new(toml);
let body = Arc::new(RwLock::new(Body::new()));
let sig_halt = Arc::new(Notify::new());
let interval_halt = Arc::new(Notify::new());
let (cmd_send, cmd_recv) = mpsc::channel(1);
/* listen for body refresh msgs and fulfill them */
let cmd = (
cmd_send.clone(),
Self::task_cmd(
Arc::clone(&toml),
bar_refresh,
global,
Arc::clone(&body),
cmd_recv,
cmd_dir,
id,
)
.await,
);
/* refresh on an interval */
let interval = (
Arc::clone(&interval_halt),
Self::task_interval(Arc::clone(&toml), interval_halt, cmd_send.clone(), id),
);
/* refresh on a signal */
let signal = (
Arc::clone(&sig_halt),
Self::task_signal(sig_halt, cmd_send.clone(), toml.signal, id),
);
/* initialize block */
// cmd must only halt in Self::halt. otherwise, it could halt during
// this function and this would panic
cmd_send.send(true).await.unwrap();
Self {
body,
cmd,
interval,
signal,
}
}
#[allow(clippy::items_after_statements)]
async fn task_cmd(
toml: Arc<TomlBlock>,
bar_refresh: mpsc::Sender<bool>,
global: Body,
body: Arc<RwLock<Body>>,
mut cmd_recv: mpsc::Receiver<bool>,
cmd_dir: PathBuf,
id: usize,
) -> JoinHandle<()> {
async fn apply_scopes(
mut lines: str::Lines<'_>,
global: &Body,
toml: &Arc<TomlBlock>,
body: &Arc<RwLock<Body>>,
) {
let mut body = body.write().await;
fn update<T: Clone + FromStr>(
field: &mut Option<T>,
value: Option<&str>,
local: &Option<T>,
global: &Option<T>,
) {
*field = match value {
Some(val) => match val.parse() {
Ok(new) => Some(new),
Err(_) => None,
},
None => None,
}
.or_else(|| local.clone())
.or_else(|| global.clone());
}
update(
&mut body.full_text,
lines.next(),
&toml.body.full_text,
&global.full_text,
);
update(
&mut body.short_text,
lines.next(),
&toml.body.short_text,
&global.short_text,
);
update(
&mut body.color,
lines.next(),
&toml.body.color,
&global.color,
);
update(
&mut body.background,
lines.next(),
&toml.body.background,
&global.background,
);
update(
&mut body.border,
lines.next(),
&toml.body.border,
&global.border,
);
update(
&mut body.border_top,
lines.next(),
&toml.body.border_top,
&global.border_top,
);
update(
&mut body.border_bottom,
lines.next(),
&toml.body.border_bottom,
&global.border_bottom,
);
update(
&mut body.border_left,
lines.next(),
&toml.body.border_left,
&global.border_left,
);
update(
&mut body.border_right,
lines.next(),
&toml.body.border_right,
&global.border_right,
);
update(
&mut body.min_width,
lines.next(),
&toml.body.min_width,
&global.min_width,
);
update(
&mut body.align,
lines.next(),
&toml.body.align,
&global.align,
);
update(&mut body.name, lines.next(), &toml.body.name, &global.name);
update(
&mut body.instance,
lines.next(),
&toml.body.instance,
&global.instance,
);
update(
&mut body.urgent,
lines.next(),
&toml.body.urgent,
&global.urgent,
);
update(
&mut body.separator,
lines.next(),
&toml.body.separator,
&global.separator,
);
update(
&mut body.separator_block_width,
lines.next(),
&toml.body.separator_block_width,
&global.separator_block_width,
);
update(
&mut body.markup,
lines.next(),
&toml.body.markup,
&global.markup,
);
// full text is prefixed by `prefix`,
// postfixed by `postfix` field in toml
if let Some(ref prefix) = toml.prefix {
if let Some(full_text) = &body.full_text {
let mut prefix = prefix.to_string();
prefix.push_str(full_text);
body.full_text = Some(prefix);
}
}
if let Some(ref mut full_text) = body.full_text {
if let Some(postfix) = &toml.postfix {
full_text.push_str(postfix);
}
};
}
async fn ping_bar(bar_refresh: &mpsc::Sender<bool>) {
// the refresh receiver must not be dropped until cmd receives halt.
bar_refresh.send(true).await.unwrap();
trace!("refresh requested");
}
// respond to refresh requests
task::spawn(async move {
// initialize block body according to local and global scope
apply_scopes("".lines(), &global, &toml, &body).await;
{
let span = span!(Level::TRACE, "block_body_init", id, command = toml.command);
async {
ping_bar(&bar_refresh).await;
}
.instrument(span)
.await;
}
// senders must not be dropped until cmd_recv receives `false`.
while cmd_recv.recv().await.unwrap() {
if let Some(ref toml_command) = toml.command {
let mut command = Command::new(toml_command);
command.current_dir(&cmd_dir);
// run command and capture output for Body
select!(
// we may receive halt while the command is running
// (which could take arbitrary time)
maybe_halt = cmd_recv.recv() => {
if !maybe_halt.unwrap() {
// halt!
break;
}
}
// refresh block body
Ok(try_output) = task::spawn_blocking(move || command.output()) => {
let span = span!(
Level::TRACE,
"block_cmd_loop",
id,
command = toml_command,
);
let _enter = span.enter();
let immediate = match try_output {
Ok(output) => {
if !output.status.success() {
warn!(
code = output.status.code(),
"command exited with failure",
);
}
if let Ok(utf8) = String::from_utf8(output.stdout) {
Some(utf8)
} else {
error!(
"command produced invalid utf8",
);
None
}
}
Err(error) => {
error!("command error: {}", error);
None
}
}.unwrap_or_default();
// if command fails (and immedate == ""), this
// iterator will only yield None
let lines = immediate.lines();
// update body with scopes regardless of whether
// command succeeded
apply_scopes(lines, &global, &toml, &body).await;
// ping parent bar to let know we are refreshed
ping_bar(&bar_refresh).await;
}
);
} else {
// no command is set, the body cant change until config
// changes
}
}
})
}
fn task_interval(
toml: Arc<TomlBlock>,
halt: Arc<Notify>,
cmd_send: mpsc::Sender<bool>,
id: usize,
) -> JoinHandle<()> {
task::spawn(async move {
let mut yes_actually_exit = false;
let mut deadline = Instant::now();
let span = span!(Level::TRACE, "block_interval_loop", id);
match toml.interval.map(Duration::try_from_secs_f32) {
Some(Ok(mut timeout)) => {
if timeout == Duration::ZERO {
let _enter = span.enter();
error!("can't have timeout of zero");
} else {
if timeout < Duration::from_millis(1) {
let _enter = span.enter();
timeout = Duration::from_millis(1);
warn!("timeout was really small and clamped to a millisecond");
}
loop {
// NOTE: if an iteration is faster than `timeout`,
// deadline < Instant::now
let now = Instant::now();
while deadline < now {
if let Some(new) = deadline.checked_add(timeout) {
deadline = new;
} else {
let _enter = span.enter();
error!("deadline is unrepresentable");
break;
}
}
match time::timeout_at(deadline, halt.notified()).await {
Ok(()) => {
// we received halt msg
yes_actually_exit = true;
break;
}
Err(_) => {
// receiving halt msg timed out, so we
// refresh the body. this creates the
// behavior of refreshing the body at a
// specific interval until halting. cmd loop
// must not halt while interval loop is
// running.
cmd_send.send(true).await.unwrap();
}
}
}
}
}
Some(Err(error)) => {
let _enter = span.enter();
error!("invalid timeout: {}", error);
}
_ => (),
}
if !yes_actually_exit {
// wait for halt msg
halt.notified().await;
}
})
}
fn task_signal(
halt: Arc<Notify>,
cmd_send: mpsc::Sender<bool>,
signal: Option<i32>,
id: usize,
) -> JoinHandle<()> {
task::spawn(async move {
let mut yes_actually_exit = false;
let span = span!(Level::TRACE, "block_signal_loop", id, signal);
if let Some(sig) = signal {
let sig = SignalKind::from_raw(sig);
if let Ok(mut stream) = signal::unix::signal(sig) {
loop {
select!(
() = halt.notified() => {
// we received halt msg
yes_actually_exit = true;
break;
}
sig = stream.recv() => {
if sig.is_some() {
// refresh the body on receiving signal. the
// cmd loop must not halt while signal loop
// is running.
cmd_send.send(true).await.unwrap();
}
}
);
}
} else {
let _enter = span.enter();
error!("invalid signal");
}
}
if !yes_actually_exit {
// wait for halt msg
halt.notified().await;
}
})
}
/// Lock the body and return a guard to it.
pub async fn read(&self) -> RwLockReadGuard<Body> {
self.body.read().await
}
/// Gracefully halt the block, consuming it.
#[allow(clippy::missing_panics_doc)]
pub async fn halt(self) {
// halt interval and signal tasks. both loops must exclusively be halted
// here.
self.interval.0.notify_one();
self.signal.0.notify_one();
// both loops must not panic
self.interval.1.await.unwrap();
self.signal.1.await.unwrap();
// halt cmd channel, after interval/signal tasks are done
// NOTE: if `cmd` halts while `interval` or `signal` are alive, they
// will fail to send a refresh to `cmd`
self.cmd.0.send(false).await.unwrap();
// cmd loop must not panic
self.cmd.1.await.unwrap();
}
}