mod builder;
mod config;
pub(crate) mod terminal;
mod virtual_terminal;
pub use builder::RuntimeBuilder;
pub use config::{RuntimeConfig, TerminalHook};
use std::io::Stdout;
use crate::error;
use std::pin::Pin;
use ratatui::Terminal;
use ratatui::backend::{Backend, CrosstermBackend};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use super::command::{BoxedError, Command, CommandHandler};
use super::model::App;
use super::runtime_core::{ProcessEventResult, RuntimeCore};
use super::subscription::{BoxedSubscription, Subscription};
use crate::backend::CaptureBackend;
use crate::input::EventQueue;
use crate::overlay::{Overlay, OverlayStack};
use crate::theme::Theme;
pub struct Runtime<A: App, B: Backend> {
core: RuntimeCore<A, B>,
commands: CommandHandler<A::Message>,
config: RuntimeConfig,
message_tx: mpsc::Sender<A::Message>,
message_rx: mpsc::Receiver<A::Message>,
error_tx: mpsc::Sender<BoxedError>,
error_rx: mpsc::Receiver<BoxedError>,
cancel_token: CancellationToken,
}
pub type TerminalRuntime<A> = Runtime<A, CrosstermBackend<Stdout>>;
pub type VirtualRuntime<A> = Runtime<A, CaptureBackend>;
impl<A: App, B: Backend> Runtime<A, B> {
pub(crate) fn with_backend_state_and_config(
backend: B,
state: A::State,
init_cmd: Command<A::Message>,
config: RuntimeConfig,
) -> error::Result<Self> {
let terminal = Terminal::new(backend)?;
let (message_tx, message_rx) = mpsc::channel(config.message_channel_capacity);
let (error_tx, error_rx) = mpsc::channel(config.message_channel_capacity);
let cancel_token = CancellationToken::new();
let mut commands = CommandHandler::new();
commands.execute(init_cmd);
let mut runtime = Self {
core: RuntimeCore {
state,
terminal,
events: EventQueue::new(),
overlay_stack: OverlayStack::new(),
theme: Theme::default(),
should_quit: false,
max_messages_per_tick: config.max_messages_per_tick,
},
commands,
config,
message_tx,
message_rx,
error_tx,
error_rx,
cancel_token,
};
runtime.spawn_pending_commands();
Ok(runtime)
}
pub fn state(&self) -> &A::State {
&self.core.state
}
pub fn state_mut(&mut self) -> &mut A::State {
&mut self.core.state
}
pub fn terminal(&self) -> &Terminal<B> {
&self.core.terminal
}
pub fn terminal_mut(&mut self) -> &mut Terminal<B> {
&mut self.core.terminal
}
pub fn backend(&self) -> &B {
self.core.terminal.backend()
}
pub fn backend_mut(&mut self) -> &mut B {
self.core.terminal.backend_mut()
}
pub fn events(&mut self) -> &mut EventQueue {
&mut self.core.events
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub fn message_sender(&self) -> mpsc::Sender<A::Message> {
self.message_tx.clone()
}
pub fn error_sender(&self) -> mpsc::Sender<BoxedError> {
self.error_tx.clone()
}
pub fn take_errors(&mut self) -> Vec<BoxedError> {
let mut errors = Vec::new();
while let Ok(err) = self.error_rx.try_recv() {
errors.push(err);
}
errors
}
pub fn has_errors(&self) -> bool {
!self.error_rx.is_empty()
}
pub fn subscribe(&mut self, subscription: impl Subscription<A::Message>) {
#[cfg(feature = "tracing")]
tracing::info!("registering subscription");
let stream = Box::new(subscription).into_stream(self.cancel_token.clone());
Self::spawn_subscription(stream, self.message_tx.clone(), self.cancel_token.clone());
}
pub fn subscribe_all(&mut self, subscriptions: Vec<BoxedSubscription<A::Message>>) {
#[cfg(feature = "tracing")]
tracing::info!(count = subscriptions.len(), "registering subscriptions");
for sub in subscriptions {
let stream = sub.into_stream(self.cancel_token.clone());
Self::spawn_subscription(stream, self.message_tx.clone(), self.cancel_token.clone());
}
}
fn spawn_subscription(
stream: Pin<Box<dyn tokio_stream::Stream<Item = A::Message> + Send>>,
msg_tx: mpsc::Sender<A::Message>,
cancel: CancellationToken,
) {
tokio::spawn(async move {
let mut stream = stream;
loop {
tokio::select! {
item = stream.next() => {
match item {
Some(msg) => {
if msg_tx.send(msg).await.is_err() {
break;
}
}
None => break,
}
}
_ = cancel.cancelled() => break,
}
}
});
}
pub fn dispatch(&mut self, msg: A::Message) {
#[cfg(feature = "tracing")]
let _span = tracing::debug_span!("dispatch").entered();
let cmd = A::update(&mut self.core.state, msg);
self.commands.execute(cmd);
if self.commands.should_quit() {
self.core.should_quit = true;
}
self.spawn_pending_commands();
}
pub fn dispatch_all(&mut self, messages: impl IntoIterator<Item = A::Message>) {
for msg in messages {
self.dispatch(msg);
}
}
fn spawn_pending_commands(&mut self) {
self.commands.spawn_pending(
self.message_tx.clone(),
self.error_tx.clone(),
self.cancel_token.clone(),
);
}
pub fn process_commands(&mut self) {
let messages = self.commands.take_messages();
for msg in messages {
self.dispatch(msg);
}
for overlay in self.commands.take_overlay_pushes() {
self.core.overlay_stack.push(overlay);
}
for _ in 0..self.commands.take_overlay_pops() {
self.core.overlay_stack.pop();
}
let subscriptions = self.commands.take_subscriptions();
if !subscriptions.is_empty() {
#[cfg(feature = "tracing")]
tracing::info!(
count = subscriptions.len(),
"registering dynamic subscriptions"
);
for sub in subscriptions {
let stream = sub.into_stream(self.cancel_token.clone());
Self::spawn_subscription(
stream,
self.message_tx.clone(),
self.cancel_token.clone(),
);
}
}
for cb in self.commands.take_cancel_token_requests() {
let msg = cb(self.cancel_token.clone());
self.dispatch(msg);
}
}
fn process_async_messages(&mut self) {
#[cfg(feature = "tracing")]
let _span = tracing::debug_span!("process_async_messages").entered();
while let Ok(msg) = self.message_rx.try_recv() {
self.dispatch(msg);
}
}
pub fn render(&mut self) -> error::Result<()> {
self.core.render()
}
pub fn process_event(&mut self) -> bool {
match self.core.process_event() {
ProcessEventResult::NoEvent => false,
ProcessEventResult::Consumed => true,
ProcessEventResult::Dispatch(msg) => {
self.dispatch(msg);
true
}
}
}
pub fn process_all_events(&mut self) {
while self.process_event() {}
}
pub fn tick(&mut self) -> error::Result<()> {
#[cfg(feature = "tracing")]
let _span = tracing::debug_span!("tick").entered();
self.process_commands();
self.process_async_messages();
let mut messages_processed = 0;
while self.process_event() && messages_processed < self.core.max_messages_per_tick {
messages_processed += 1;
}
#[cfg(feature = "tracing")]
if messages_processed > 0 {
tracing::debug!(messages_processed, "tick: processed events");
}
if let Some(msg) = A::on_tick(&self.core.state) {
self.dispatch(msg);
}
if A::should_quit(&self.core.state) {
self.core.should_quit = true;
}
self.render()?;
Ok(())
}
pub fn should_quit(&self) -> bool {
self.core.should_quit
}
pub fn quit(&mut self) {
#[cfg(feature = "tracing")]
tracing::info!("runtime quit requested");
self.core.should_quit = true;
self.cancel_token.cancel();
}
pub async fn run(&mut self) -> error::Result<()> {
#[cfg(feature = "tracing")]
tracing::info!("starting runtime event loop");
let mut tick_interval = tokio::time::interval(self.config.tick_rate);
let mut render_interval = tokio::time::interval(self.config.frame_rate);
self.render()?;
loop {
tokio::select! {
Some(msg) = self.message_rx.recv() => {
#[cfg(feature = "tracing")]
tracing::debug!("runtime received async message");
self.dispatch(msg);
}
_ = tick_interval.tick() => {
self.process_commands();
let mut messages_processed = 0;
while self.process_event() && messages_processed < self.core.max_messages_per_tick {
messages_processed += 1;
}
if let Some(msg) = A::on_tick(&self.core.state) {
self.dispatch(msg);
}
if A::should_quit(&self.core.state) {
self.core.should_quit = true;
}
}
_ = render_interval.tick() => {
self.render()?;
}
_ = self.cancel_token.cancelled() => {
#[cfg(feature = "tracing")]
tracing::info!("runtime received cancellation");
self.core.should_quit = true;
}
}
if self.core.should_quit {
#[cfg(feature = "tracing")]
tracing::info!("runtime shutting down");
break;
}
}
self.render()?;
A::on_exit(&self.core.state);
Ok(())
}
pub fn run_ticks(&mut self, ticks: usize) -> error::Result<()> {
for _ in 0..ticks {
if self.core.should_quit {
break;
}
self.tick()?;
}
Ok(())
}
pub fn process_pending(&mut self) {
#[cfg(feature = "tracing")]
let _span = tracing::debug_span!("process_pending").entered();
self.process_commands();
self.process_async_messages();
}
pub fn push_overlay(&mut self, overlay: Box<dyn Overlay<A::Message>>) {
self.core.push_overlay(overlay);
}
pub fn pop_overlay(&mut self) -> Option<Box<dyn Overlay<A::Message>>> {
self.core.pop_overlay()
}
pub fn clear_overlays(&mut self) {
self.core.clear_overlays();
}
pub fn has_overlays(&self) -> bool {
self.core.has_overlays()
}
pub fn overlay_count(&self) -> usize {
self.core.overlay_count()
}
pub fn set_theme(&mut self, theme: Theme) {
self.core.theme = theme;
}
pub fn theme(&self) -> &Theme {
&self.core.theme
}
}
#[cfg(test)]
mod tests;