use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::{Duration, Instant};
use color_eyre::eyre::Result;
use futures::stream::StreamExt;
use ratatui::prelude::Backend;
use tokio::sync::mpsc;
use tokio::time::{Interval, MissedTickBehavior, interval};
use crate::{
application::Application,
command::{Action, Command},
subscription::SubscriptionManager,
};
struct ApplicationState<App: Application> {
app: App,
msg_tx: mpsc::UnboundedSender<App::Message>,
msg_rx: mpsc::UnboundedReceiver<App::Message>,
quit_tx: mpsc::UnboundedSender<()>,
quit_rx: mpsc::UnboundedReceiver<()>,
subscription_manager: SubscriptionManager<App::Message>,
}
pub struct Runtime<App: Application> {
state: ApplicationState<App>,
frame_interval: Interval,
needs_redraw: bool,
subscription_ids_hash: Option<u64>,
}
impl<App: Application> Runtime<App> {
pub fn new(flags: App::Flags, frame_rate: u32) -> Self {
let state = ApplicationState::new(flags);
let frame_duration = Duration::from_millis(1000 / u64::from(frame_rate));
let mut frame_interval = interval(frame_duration);
frame_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
Self {
state,
frame_interval,
needs_redraw: true, subscription_ids_hash: None, }
}
fn process_message_batch(&mut self, first_msg: App::Message) {
let cmd = self.state.app.update(first_msg);
self.state.enqueue_command(cmd);
let batch_deadline = Instant::now() + Duration::from_micros(100);
while Instant::now() < batch_deadline {
match self.state.msg_rx.try_recv() {
Ok(msg) => {
let cmd = self.state.app.update(msg);
self.state.enqueue_command(cmd);
}
Err(_) => break, }
}
self.needs_redraw = true;
}
fn process_frame_tick<B: Backend>(
&mut self,
terminal: &mut ratatui::Terminal<B>,
) -> Result<bool, <B as Backend>::Error> {
if self.needs_redraw {
self.state.render(terminal)?;
self.needs_redraw = false;
}
self.update_subscriptions();
Ok(self.state.check_quit())
}
fn update_subscriptions(&mut self) {
let subscriptions = self.state.app.subscriptions();
let mut hasher = DefaultHasher::new();
for sub in &subscriptions {
sub.id.hash(&mut hasher);
}
let current_hash = hasher.finish();
if self.subscription_ids_hash != Some(current_hash) {
self.state.subscription_manager.update(subscriptions);
self.subscription_ids_hash = Some(current_hash);
}
}
pub async fn run<B: Backend>(
mut self,
terminal: &mut ratatui::Terminal<B>,
) -> Result<(), <B as Backend>::Error> {
self.state.initialize_subscriptions();
loop {
tokio::select! {
Some(msg) = self.state.msg_rx.recv() => {
self.process_message_batch(msg);
}
_ = self.frame_interval.tick() => {
if self.process_frame_tick(terminal)? {
break;
}
}
_ = self.state.quit_rx.recv() => {
break;
}
}
}
self.state.shutdown();
Ok(())
}
}
impl<App: Application> ApplicationState<App> {
#[must_use]
fn new(flags: App::Flags) -> Self {
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let (quit_tx, quit_rx) = mpsc::unbounded_channel();
let subscription_manager = SubscriptionManager::new(msg_tx.clone());
let (app, init_cmd) = App::new(flags);
let runtime = Self {
app,
msg_tx,
msg_rx,
quit_tx,
quit_rx,
subscription_manager,
};
runtime.enqueue_command(init_cmd);
runtime
}
fn enqueue_command(&self, cmd: Command<App::Message>) {
if let Some(stream) = cmd.stream {
let msg_tx = self.msg_tx.clone();
let quit_tx = self.quit_tx.clone();
tokio::spawn(async move {
futures::pin_mut!(stream);
while let Some(action) = stream.next().await {
match action {
Action::Message(msg) => {
let _ = msg_tx.send(msg);
}
Action::Quit => {
let _ = quit_tx.send(());
break;
}
}
}
});
}
}
fn check_quit(&mut self) -> bool {
self.quit_rx.try_recv().is_ok()
}
fn initialize_subscriptions(&mut self) {
let subscriptions = self.app.subscriptions();
self.subscription_manager.update(subscriptions);
}
fn render<B: Backend>(
&self,
terminal: &mut ratatui::Terminal<B>,
) -> Result<(), <B as Backend>::Error> {
terminal.draw(|frame| {
self.app.view(frame);
})?;
Ok(())
}
fn shutdown(&mut self) {
self.subscription_manager.shutdown();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::application::Application;
use crate::command::{Action, Command};
use crate::subscription::Subscription;
use crate::subscription::time::Timer;
use ratatui::backend::TestBackend;
use ratatui::prelude::*;
use tokio::time::{Duration, sleep};
#[derive(Debug)]
struct TestApp {
counter: i32,
should_quit: bool,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
enum TestMessage {
Increment,
Quit,
}
impl Application for TestApp {
type Message = TestMessage;
type Flags = i32;
fn new(initial: i32) -> (Self, Command<Self::Message>) {
(
Self {
counter: initial,
should_quit: false,
},
Command::none(),
)
}
fn update(&mut self, msg: Self::Message) -> Command<Self::Message> {
match msg {
TestMessage::Increment => {
self.counter += 1;
Command::none()
}
TestMessage::Quit => {
self.should_quit = true;
Command::effect(Action::Quit)
}
}
}
fn view(&self, _frame: &mut Frame<'_>) {
}
fn subscriptions(&self) -> Vec<Subscription<Self::Message>> {
vec![]
}
}
#[test]
fn test_runtime_new() {
let runtime = ApplicationState::<TestApp>::new(42);
assert_eq!(runtime.app.counter, 42);
}
#[test]
fn test_runtime_new_with_zero() {
let runtime = ApplicationState::<TestApp>::new(0);
assert_eq!(runtime.app.counter, 0);
}
#[test]
fn test_runtime_new_initializes_channels() {
let runtime = ApplicationState::<TestApp>::new(0);
assert_eq!(runtime.app.counter, 0);
}
struct AppWithInitCommand {
initialized: bool,
}
impl Application for AppWithInitCommand {
type Message = bool;
type Flags = ();
fn new(_flags: ()) -> (Self, Command<Self::Message>) {
let cmd = Command::future(async { true });
(Self { initialized: false }, cmd)
}
fn update(&mut self, msg: Self::Message) -> Command<Self::Message> {
self.initialized = msg;
Command::none()
}
fn view(&self, _frame: &mut Frame<'_>) {}
fn subscriptions(&self) -> Vec<Subscription<Self::Message>> {
vec![]
}
}
#[tokio::test]
async fn test_runtime_processes_init_command() {
let runtime = ApplicationState::<AppWithInitCommand>::new(());
sleep(Duration::from_millis(50)).await;
assert!(!runtime.app.initialized);
}
#[test]
fn test_runtime_enqueue_command_none() {
let runtime = ApplicationState::<TestApp>::new(0);
runtime.enqueue_command(Command::none());
}
#[tokio::test]
async fn test_runtime_enqueue_command_with_message() {
let runtime = ApplicationState::<TestApp>::new(0);
let cmd = Command::future(async { TestMessage::Increment });
runtime.enqueue_command(cmd);
sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn test_runtime_enqueue_command_with_quit() {
let runtime = ApplicationState::<TestApp>::new(0);
let cmd = Command::effect(Action::Quit);
runtime.enqueue_command(cmd);
sleep(Duration::from_millis(50)).await;
}
#[test]
fn test_multiple_runtimes() {
let runtime1 = ApplicationState::<TestApp>::new(1);
let runtime2 = ApplicationState::<TestApp>::new(2);
assert_eq!(runtime1.app.counter, 1);
assert_eq!(runtime2.app.counter, 2);
}
struct AppWithStringFlags {
name: String,
}
impl Application for AppWithStringFlags {
type Message = ();
type Flags = String;
fn new(name: String) -> (Self, Command<Self::Message>) {
(Self { name }, Command::none())
}
fn update(&mut self, _msg: Self::Message) -> Command<Self::Message> {
Command::none()
}
fn view(&self, _frame: &mut Frame<'_>) {}
fn subscriptions(&self) -> Vec<Subscription<Self::Message>> {
vec![]
}
}
#[test]
fn test_runtime_with_string_flags() {
let runtime = ApplicationState::<AppWithStringFlags>::new("test".to_string());
assert_eq!(runtime.app.name, "test");
}
#[test]
fn test_runtime_with_empty_string_flags() {
let runtime = ApplicationState::<AppWithStringFlags>::new(String::new());
assert_eq!(runtime.app.name, "");
}
#[test]
fn test_check_quit_no_signal() {
let mut runtime = ApplicationState::<TestApp>::new(0);
assert!(!runtime.check_quit());
}
#[test]
fn test_check_quit_with_signal() {
let mut runtime = ApplicationState::<TestApp>::new(0);
let _ = runtime.quit_tx.send(());
assert!(runtime.check_quit());
}
#[test]
fn test_check_quit_multiple_signals() {
let mut runtime = ApplicationState::<TestApp>::new(0);
let _ = runtime.quit_tx.send(());
let _ = runtime.quit_tx.send(());
assert!(runtime.check_quit());
assert!(runtime.check_quit());
assert!(!runtime.check_quit());
}
#[tokio::test]
async fn test_initialize_subscriptions() {
struct AppWithSubs;
impl Application for AppWithSubs {
type Message = ();
type Flags = ();
fn new((): ()) -> (Self, Command<()>) {
(Self, Command::none())
}
fn update(&mut self, (): ()) -> Command<()> {
Command::none()
}
fn view(&self, _frame: &mut Frame<'_>) {}
fn subscriptions(&self) -> Vec<Subscription<()>> {
vec![Subscription::new(Timer::new(100)).map(|_| ())]
}
}
let mut runtime = ApplicationState::<AppWithSubs>::new(());
runtime.initialize_subscriptions();
}
#[test]
fn test_initialize_subscriptions_empty() {
let mut runtime = ApplicationState::<TestApp>::new(0);
runtime.initialize_subscriptions();
}
#[test]
fn test_render() -> Result<()> {
let runtime = ApplicationState::<TestApp>::new(0);
let backend = TestBackend::new(80, 24);
let mut terminal = Terminal::new(backend)?;
assert!(runtime.render(&mut terminal).is_ok());
Ok(())
}
#[test]
fn test_render_multiple_times() -> Result<()> {
let runtime = ApplicationState::<TestApp>::new(0);
let backend = TestBackend::new(80, 24);
let mut terminal = Terminal::new(backend)?;
assert!(runtime.render(&mut terminal).is_ok());
assert!(runtime.render(&mut terminal).is_ok());
assert!(runtime.render(&mut terminal).is_ok());
Ok(())
}
#[test]
fn test_shutdown() {
let mut runtime = ApplicationState::<TestApp>::new(0);
runtime.shutdown();
}
#[tokio::test]
async fn test_shutdown_after_initialize_subscriptions() {
struct AppWithSubs;
impl Application for AppWithSubs {
type Message = ();
type Flags = ();
fn new((): ()) -> (Self, Command<()>) {
(Self, Command::none())
}
fn update(&mut self, (): ()) -> Command<()> {
Command::none()
}
fn view(&self, _frame: &mut Frame<'_>) {}
fn subscriptions(&self) -> Vec<Subscription<()>> {
use crate::subscription::time::Timer;
vec![Subscription::new(Timer::new(100)).map(|_| ())]
}
}
let mut runtime = ApplicationState::<AppWithSubs>::new(());
runtime.initialize_subscriptions();
runtime.shutdown();
}
#[tokio::test]
async fn test_event_loop_new() {
let runtime = Runtime::<TestApp>::new(0, 60);
assert_eq!(runtime.state.app.counter, 0);
}
#[tokio::test]
async fn test_event_loop_new_with_different_frame_rates() {
let _runtime1 = Runtime::<TestApp>::new(0, 30);
let _runtime2 = Runtime::<TestApp>::new(0, 144);
}
#[tokio::test]
async fn test_event_loop_process_message_batch_single_message() {
let mut runtime = Runtime::<TestApp>::new(0, 60);
runtime.process_message_batch(TestMessage::Increment);
assert_eq!(runtime.state.app.counter, 1);
assert!(runtime.needs_redraw);
}
#[tokio::test]
async fn test_event_loop_process_message_batch_with_batching() {
let mut runtime = Runtime::<TestApp>::new(0, 60);
let _ = runtime.state.msg_tx.send(TestMessage::Increment);
let _ = runtime.state.msg_tx.send(TestMessage::Increment);
let _ = runtime.state.msg_tx.send(TestMessage::Increment);
sleep(Duration::from_millis(10)).await;
runtime.process_message_batch(TestMessage::Increment);
assert_eq!(runtime.state.app.counter, 4);
}
#[tokio::test]
async fn test_event_loop_process_frame_tick_renders_when_needed() -> Result<()> {
let mut runtime = Runtime::<TestApp>::new(0, 60);
let backend = TestBackend::new(80, 24);
let mut terminal = Terminal::new(backend)?;
assert!(runtime.needs_redraw);
let should_quit = runtime.process_frame_tick(&mut terminal)?;
assert!(!should_quit);
assert!(!runtime.needs_redraw);
Ok(())
}
#[tokio::test]
async fn test_event_loop_process_frame_tick_skips_render_when_not_needed() -> Result<()> {
let mut runtime = Runtime::<TestApp>::new(0, 60);
let backend = TestBackend::new(80, 24);
let mut terminal = Terminal::new(backend)?;
runtime.needs_redraw = false;
let should_quit = runtime.process_frame_tick(&mut terminal)?;
assert!(!should_quit);
assert!(!runtime.needs_redraw);
Ok(())
}
#[tokio::test]
async fn test_event_loop_process_frame_tick_detects_quit() -> Result<()> {
let mut runtime = Runtime::<TestApp>::new(0, 60);
let _ = runtime.state.quit_tx.send(());
let backend = TestBackend::new(80, 24);
let mut terminal = Terminal::new(backend)?;
runtime.needs_redraw = false;
let should_quit = runtime.process_frame_tick(&mut terminal)?;
assert!(should_quit);
Ok(())
}
#[tokio::test]
async fn test_event_loop_process_frame_tick_updates_subscriptions() -> Result<()> {
struct DynamicApp {
enabled: bool,
}
impl Application for DynamicApp {
type Message = ();
type Flags = bool;
fn new(enabled: bool) -> (Self, Command<Self::Message>) {
(Self { enabled }, Command::none())
}
fn update(&mut self, (): ()) -> Command<Self::Message> {
Command::none()
}
fn view(&self, _frame: &mut Frame<'_>) {}
fn subscriptions(&self) -> Vec<Subscription<Self::Message>> {
if self.enabled {
vec![Subscription::new(Timer::new(100)).map(|_| ())]
} else {
vec![]
}
}
}
let mut runtime = Runtime::<DynamicApp>::new(true, 60);
let backend = TestBackend::new(80, 24);
let mut terminal = Terminal::new(backend)?;
runtime.needs_redraw = false;
assert_eq!(runtime.subscription_ids_hash, None);
runtime.process_frame_tick(&mut terminal)?;
assert!(runtime.subscription_ids_hash.is_some());
Ok(())
}
}