use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use costroid_core::{
collect_local_snapshot, now_summary, EngineSnapshot, HostEnv, NowOptions, NowSummary,
};
pub const REFRESH_INTERVAL: Duration = Duration::from_secs(30);
pub struct Loaded {
pub snapshot: EngineSnapshot,
pub summary: NowSummary,
}
pub enum RefreshOutcome {
Ok(Box<Loaded>),
Err(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Phase {
Idle,
InFlight,
}
#[derive(Default)]
pub struct RefreshState {
phase: PhaseCell,
loaded: Option<Loaded>,
error: Option<String>,
last_completed: Option<Instant>,
}
struct PhaseCell(Phase);
impl Default for PhaseCell {
fn default() -> Self {
PhaseCell(Phase::Idle)
}
}
impl RefreshState {
pub fn new() -> Self {
Self::default()
}
pub fn phase(&self) -> Phase {
self.phase.0
}
pub fn loaded(&self) -> Option<&Loaded> {
self.loaded.as_ref()
}
pub fn error(&self) -> Option<&str> {
self.error.as_deref()
}
pub fn has_data(&self) -> bool {
self.loaded.is_some()
}
pub fn since_last_completed(&self) -> Option<Duration> {
self.last_completed.map(|t| t.elapsed())
}
pub fn mark_requested(&mut self) {
self.phase = PhaseCell(Phase::InFlight);
}
pub fn apply(&mut self, outcome: RefreshOutcome, completed_at: Instant) {
self.phase = PhaseCell(Phase::Idle);
self.last_completed = Some(completed_at);
match outcome {
RefreshOutcome::Ok(loaded) => {
self.loaded = Some(*loaded);
self.error = None;
}
RefreshOutcome::Err(reason) => {
self.error = Some(reason);
}
}
}
}
pub fn due_for_refresh(phase: Phase, since_last: Option<Duration>, interval: Duration) -> bool {
phase == Phase::Idle && since_last.is_none_or(|elapsed| elapsed >= interval)
}
pub struct RefreshWorker {
request_tx: Sender<()>,
outcome_rx: Receiver<RefreshOutcome>,
_handle: Option<JoinHandle<()>>,
}
impl RefreshWorker {
pub fn spawn(ctx: egui::Context) -> Self {
let (request_tx, request_rx) = mpsc::channel::<()>();
let (outcome_tx, outcome_rx) = mpsc::channel::<RefreshOutcome>();
let worker_tx = outcome_tx.clone();
let handle = std::thread::Builder::new()
.name("costroid-bar-refresh".to_owned())
.spawn(move || worker_loop(&request_rx, &worker_tx, &ctx));
let handle = match handle {
Ok(handle) => Some(handle),
Err(err) => {
let _ = outcome_tx.send(RefreshOutcome::Err(format!(
"could not start the refresh worker: {err}"
)));
None
}
};
Self {
request_tx,
outcome_rx,
_handle: handle,
}
}
pub fn request(&self) {
let _ = self.request_tx.send(());
}
pub fn poll(&self) -> Option<RefreshOutcome> {
self.outcome_rx.try_recv().ok()
}
}
fn worker_loop(
request_rx: &Receiver<()>,
outcome_tx: &Sender<RefreshOutcome>,
ctx: &egui::Context,
) {
let env = HostEnv::detect();
while request_rx.recv().is_ok() {
while request_rx.try_recv().is_ok() {}
let outcome = collect(&env);
if outcome_tx.send(outcome).is_err() {
break;
}
ctx.request_repaint();
}
}
fn collect(env: &HostEnv) -> RefreshOutcome {
match collect_local_snapshot(env) {
Ok(snapshot) => {
let summary = now_summary(&snapshot, NowOptions::default());
RefreshOutcome::Ok(Box::new(Loaded { snapshot, summary }))
}
Err(err) => RefreshOutcome::Err(format!("could not read local data: {err}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_loaded() -> Box<Loaded> {
let snapshot = EngineSnapshot {
generated_at: match chrono::DateTime::from_timestamp(1_900_000_000, 0) {
Some(dt) => dt,
None => panic!("invalid test timestamp"),
},
usage_events: Vec::new(),
focus_rows: Vec::new(),
limit_windows: Vec::new(),
providers: Vec::new(),
capabilities: Vec::new(),
};
let summary = now_summary(&snapshot, NowOptions::default());
Box::new(Loaded { snapshot, summary })
}
#[test]
fn due_for_refresh_only_when_idle_and_interval_elapsed() {
let interval = Duration::from_secs(30);
assert!(
due_for_refresh(Phase::Idle, None, interval),
"first refresh (nothing collected yet) is due"
);
assert!(!due_for_refresh(
Phase::Idle,
Some(Duration::from_secs(5)),
interval
));
assert!(due_for_refresh(
Phase::Idle,
Some(Duration::from_secs(30)),
interval
));
assert!(due_for_refresh(
Phase::Idle,
Some(Duration::from_secs(90)),
interval
));
assert!(
!due_for_refresh(Phase::InFlight, None, interval),
"never start a second collect while one is in flight"
);
assert!(!due_for_refresh(
Phase::InFlight,
Some(Duration::from_secs(120)),
interval
));
}
#[test]
fn new_state_is_idle_and_empty() {
let state = RefreshState::new();
assert_eq!(state.phase(), Phase::Idle);
assert!(!state.has_data());
assert!(state.error().is_none());
assert!(state.since_last_completed().is_none());
}
#[test]
fn request_then_success_clears_error_and_stores_data() {
let mut state = RefreshState::new();
state.mark_requested();
assert_eq!(state.phase(), Phase::InFlight);
state.apply(RefreshOutcome::Ok(empty_loaded()), Instant::now());
assert_eq!(state.phase(), Phase::Idle);
assert!(state.has_data());
assert!(state.error().is_none());
assert!(state.since_last_completed().is_some());
}
#[test]
fn failure_keeps_last_good_data_and_records_error() {
let mut state = RefreshState::new();
state.apply(RefreshOutcome::Ok(empty_loaded()), Instant::now());
assert!(state.has_data());
state.mark_requested();
state.apply(
RefreshOutcome::Err("could not read local data: boom".to_owned()),
Instant::now(),
);
assert_eq!(state.phase(), Phase::Idle);
assert!(
state.has_data(),
"a failed refresh must not drop the last good data"
);
assert_eq!(state.error(), Some("could not read local data: boom"));
}
#[test]
fn success_after_failure_clears_the_error() {
let mut state = RefreshState::new();
state.apply(RefreshOutcome::Err("boom".to_owned()), Instant::now());
assert_eq!(state.error(), Some("boom"));
state.apply(RefreshOutcome::Ok(empty_loaded()), Instant::now());
assert!(state.error().is_none());
assert!(state.has_data());
}
}