#[cfg(all(feature = "crossterm-backend", feature = "tokio-runtime"))]
mod inner {
use std::collections::HashMap;
use std::io;
use crossterm::event::{Event, EventStream};
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::{Action, Tea};
pub async fn run<M, F>(model: M, event_to_msg: F) -> io::Result<()>
where
M: Tea,
F: Fn(Event) -> Option<M::Msg> + Send + 'static,
{
Runner::new(model, event_to_msg).run().await
}
pub struct Runner<M: Tea, F> {
model: M,
event_to_msg: F,
}
impl<M, F> Runner<M, F>
where
M: Tea,
F: Fn(Event) -> Option<M::Msg> + Send + 'static,
{
pub fn new(model: M, event_to_msg: F) -> Self {
Runner {
model,
event_to_msg,
}
}
pub async fn run(mut self) -> io::Result<()> {
let mut terminal = ratatui::try_init()?;
let result = self.event_loop(&mut terminal).await;
ratatui::try_restore()?;
result
}
async fn event_loop(
&mut self,
terminal: &mut ratatui::Terminal<ratatui::backend::CrosstermBackend<io::Stdout>>,
) -> io::Result<()> {
let (task_tx, mut task_rx) = mpsc::unbounded_channel::<M::Msg>();
let init_action = self.model.init();
Self::process_action(init_action, &task_tx);
terminal.draw(|f| self.model.view(f))?;
let mut events = EventStream::new();
let mut active_subs: HashMap<&'static str, JoinHandle<()>> = HashMap::new();
let sub_tx = task_tx.clone();
self.sync_subscriptions(&mut active_subs, &sub_tx);
loop {
let msg = tokio::select! {
Some(Ok(ev)) = events.next() => {
(self.event_to_msg)(ev)
}
Some(msg) = task_rx.recv() => {
Some(msg)
}
};
let Some(msg) = msg else { continue };
let cmd = self.model.update(msg);
let quit = matches!(cmd.action, Action::Quit);
Self::process_action(cmd.action, &task_tx);
if quit {
break;
}
self.sync_subscriptions(&mut active_subs, &sub_tx);
if cmd.dirty {
terminal.draw(|f| self.model.view(f))?;
}
}
for (_, handle) in active_subs.drain() {
handle.abort();
}
Ok(())
}
fn process_action(action: Action<M::Msg>, task_tx: &mpsc::UnboundedSender<M::Msg>) {
match action {
Action::None | Action::Quit => {}
Action::Task(fut) => {
let tx = task_tx.clone();
tokio::spawn(async move {
let msg = fut.await;
let _ = tx.send(msg);
});
}
Action::Batch(actions) => {
for action in actions {
Self::process_action(action, task_tx);
}
}
}
}
fn sync_subscriptions(
&self,
active: &mut HashMap<&'static str, JoinHandle<()>>,
task_tx: &mpsc::UnboundedSender<M::Msg>,
) {
let desired = self.model.subscriptions();
let desired_ids: std::collections::HashSet<&'static str> =
desired.iter().map(|s| s.id).collect();
active.retain(|id, handle| {
if desired_ids.contains(id) {
true
} else {
handle.abort();
false
}
});
for sub in desired {
if active.contains_key(sub.id) {
continue;
}
let tx = task_tx.clone();
let id = sub.id;
let mut stream = sub.into_stream();
let handle = tokio::spawn(async move {
while let Some(msg) = StreamExt::next(&mut stream).await {
if tx.send(msg).is_err() {
break;
}
}
});
active.insert(id, handle);
}
}
}
}
#[cfg(all(feature = "crossterm-backend", feature = "tokio-runtime"))]
pub use inner::*;