mod command_executor;
pub mod error_handler;
mod event_processor;
mod fps_limiter;
mod priority_event_processor;
mod terminal_manager;
pub use command_executor::CommandExecutor;
pub use event_processor::EventProcessor;
pub use fps_limiter::FpsLimiter;
pub use priority_event_processor::{
get_event_stats, EventStats, PriorityConfig, PriorityEventProcessor,
};
pub use terminal_manager::{TerminalConfig, TerminalManager};
use crate::async_handle::AsyncHandle;
use crate::panic_recovery::{self, PanicRecoveryStrategy};
use crate::resource_limits::ResourceLimits;
use crate::string_renderer::StringRenderer;
use crate::subscription::Subscription;
use crossterm::event::{self};
use hojicha_core::core::Model;
use hojicha_core::error::{Error, Result};
use hojicha_core::event::Event;
use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
type MessageFilter<M> = Box<
dyn Fn(&M, Event<<M as Model>::Message>) -> Option<Event<<M as Model>::Message>> + Send + Sync,
>;
type ConditionCheck<M> = Box<dyn FnMut(&M) -> bool>;
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum MouseMode {
#[default]
None,
CellMotion,
AllMotion,
}
pub struct ProgramOptions {
pub alt_screen: bool,
pub mouse_mode: MouseMode,
pub bracketed_paste: bool,
pub focus_reporting: bool,
pub fps: u16,
pub headless: bool,
pub install_signal_handler: bool,
pub without_renderer: bool,
pub output: Option<Box<dyn Write + Send + Sync>>,
pub input: Option<Box<dyn Read + Send + Sync>>,
pub panic_recovery_strategy: PanicRecoveryStrategy,
pub resource_limits: ResourceLimits,
}
impl ProgramOptions {
pub fn new() -> Self {
Self {
alt_screen: true,
mouse_mode: MouseMode::None,
bracketed_paste: false,
focus_reporting: false,
fps: 60,
headless: false,
install_signal_handler: true,
without_renderer: false,
output: None,
input: None,
panic_recovery_strategy: PanicRecoveryStrategy::default(),
resource_limits: ResourceLimits::default(),
}
}
pub fn with_alt_screen(mut self, enable: bool) -> Self {
self.alt_screen = enable;
self
}
pub fn with_mouse_mode(mut self, mode: MouseMode) -> Self {
self.mouse_mode = mode;
self
}
pub fn with_bracketed_paste(mut self, enable: bool) -> Self {
self.bracketed_paste = enable;
self
}
pub fn with_focus_reporting(mut self, enable: bool) -> Self {
self.focus_reporting = enable;
self
}
pub fn with_fps(mut self, fps: u16) -> Self {
self.fps = fps;
self
}
pub fn headless(mut self) -> Self {
self.headless = true;
self
}
pub fn without_signal_handler(mut self) -> Self {
self.install_signal_handler = false;
self
}
pub fn without_renderer(mut self) -> Self {
self.without_renderer = true;
self
}
pub fn with_panic_recovery(mut self, strategy: PanicRecoveryStrategy) -> Self {
self.panic_recovery_strategy = strategy;
self
}
pub fn with_output(mut self, output: Box<dyn Write + Send + Sync>) -> Self {
self.output = Some(output);
self
}
pub fn with_input(mut self, input: Box<dyn Read + Send + Sync>) -> Self {
self.input = Some(input);
self
}
pub fn with_input_string(mut self, input: &str) -> Self {
use std::io::Cursor;
self.input = Some(Box::new(Cursor::new(input.as_bytes().to_vec())));
self
}
pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
self.resource_limits = limits;
self
}
}
impl Default for ProgramOptions {
fn default() -> Self {
Self::new()
}
}
pub struct Program<M: Model> {
model: M,
options: ProgramOptions,
terminal_manager: TerminalManager,
command_executor: CommandExecutor<M::Message>,
fps_limiter: FpsLimiter,
message_tx: Option<mpsc::SyncSender<Event<M::Message>>>,
message_rx: Option<mpsc::Receiver<Event<M::Message>>>,
priority_processor: PriorityEventProcessor<M::Message>,
filter: Option<MessageFilter<M>>,
running: Arc<AtomicBool>,
force_quit: Arc<AtomicBool>,
input_thread: Option<thread::JoinHandle<()>>,
string_renderer: StringRenderer,
}
impl<M: Model> Program<M>
where
M::Message: Clone,
{
pub fn new(model: M) -> Result<Self> {
Self::with_options(model, ProgramOptions::default())
}
pub fn with_options(model: M, options: ProgramOptions) -> Result<Self> {
let terminal_config = TerminalConfig {
alt_screen: options.alt_screen,
mouse_mode: options.mouse_mode,
bracketed_paste: options.bracketed_paste,
focus_reporting: options.focus_reporting,
headless: options.headless || options.without_renderer,
};
let terminal_manager = TerminalManager::new(terminal_config)?;
let command_executor =
CommandExecutor::with_resource_limits(options.resource_limits.clone())?;
let fps_limiter = FpsLimiter::new(options.fps);
let priority_processor = PriorityEventProcessor::new();
log::info!("Hojicha program initialized with priority event processing");
Ok(Self {
model,
options,
terminal_manager,
command_executor,
fps_limiter,
message_tx: None,
message_rx: None,
priority_processor,
filter: None,
running: Arc::new(AtomicBool::new(false)),
force_quit: Arc::new(AtomicBool::new(false)),
input_thread: None,
string_renderer: StringRenderer::new(),
})
}
pub fn with_filter<F>(mut self, filter: F) -> Self
where
F: Fn(&M, Event<M::Message>) -> Option<Event<M::Message>> + Send + Sync + 'static,
{
self.filter = Some(Box::new(filter));
self
}
pub fn with_priority_config(mut self, config: PriorityConfig) -> Self {
self.priority_processor = PriorityEventProcessor::with_config(config);
log::debug!("Priority processor configured with custom settings");
self
}
pub fn event_stats(&self) -> EventStats {
self.priority_processor.stats()
}
pub fn event_stats_string(&self) -> String {
get_event_stats(&self.priority_processor)
}
pub fn printf(&self, args: std::fmt::Arguments) {
eprint!("{args}");
let _ = io::stderr().flush();
}
pub fn println(&self, text: &str) {
eprintln!("{text}");
let _ = io::stderr().flush();
}
pub fn quit(&self) {
if let Some(tx) = &self.message_tx {
let _ = tx.send(Event::Quit);
}
}
pub fn kill(&self) {
self.force_quit.store(true, Ordering::SeqCst);
self.running.store(false, Ordering::SeqCst);
}
pub fn metrics(&self) -> crate::metrics::AdvancedEventStats {
self.priority_processor.advanced_metrics()
}
pub fn resource_stats(&self) -> crate::resource_limits::ResourceStats {
self.command_executor.resource_stats()
}
pub fn metrics_json(&self) -> String {
self.priority_processor.metrics_collector().export_json()
}
pub fn metrics_prometheus(&self) -> String {
self.priority_processor
.metrics_collector()
.export_prometheus()
}
pub fn metrics_text(&self) -> String {
self.priority_processor.metrics_collector().export_text()
}
pub fn metrics_dashboard(&self) -> String {
let stats = self.priority_processor.advanced_metrics();
crate::metrics::display_dashboard(&stats)
}
pub fn reset_metrics(&self) {
self.priority_processor.reset_stats();
}
pub fn resize_queue(&self, new_size: usize) -> Result<()> {
self.priority_processor
.resize_queue(new_size)
.map_err(|e| Error::Custom(Box::new(e)))
}
pub fn queue_capacity(&self) -> usize {
self.priority_processor.queue_capacity()
}
pub fn enable_auto_scaling(
&mut self,
config: crate::queue_scaling::AutoScaleConfig,
) -> &mut Self {
self.priority_processor.enable_auto_scaling(config);
self
}
pub fn with_auto_scaling(mut self) -> Self {
self.priority_processor
.enable_auto_scaling(crate::queue_scaling::AutoScaleConfig::default());
self
}
pub fn disable_auto_scaling(&mut self) -> &mut Self {
self.priority_processor.disable_auto_scaling();
self
}
pub fn sender(&self) -> Option<mpsc::SyncSender<Event<M::Message>>> {
self.message_tx.clone()
}
pub fn send_message(&self, msg: M::Message) -> Result<()> {
self.message_tx
.as_ref()
.ok_or_else(|| {
Error::from(io::Error::new(
io::ErrorKind::NotConnected,
"Program not running",
))
})?
.send(Event::User(msg))
.map_err(|_| {
Error::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"Receiver disconnected",
))
})
}
pub fn wait(&self) {
while !self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(1));
}
while self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
thread::sleep(Duration::from_millis(10));
}
}
pub fn release_terminal(&mut self) -> Result<()> {
self.terminal_manager.release().map_err(Error::from)
}
pub fn restore_terminal(&mut self) -> Result<()> {
self.terminal_manager.restore().map_err(Error::from)
}
pub fn init_async_bridge(&mut self) -> mpsc::SyncSender<Event<M::Message>> {
if self.message_tx.is_none() {
let (message_tx, message_rx) = mpsc::sync_channel(100);
self.message_tx = Some(message_tx.clone());
self.message_rx = Some(message_rx);
message_tx
} else {
self.message_tx
.as_ref()
.expect("message_tx should be Some after init check")
.clone()
}
}
pub fn subscribe<S>(&mut self, stream: S) -> Subscription
where
S: futures::Stream<Item = M::Message> + Send + 'static,
M::Message: Send + 'static,
{
use futures::StreamExt;
use tokio_util::sync::CancellationToken;
if self.message_tx.is_none() {
self.init_async_bridge();
}
let sender = self
.message_tx
.as_ref()
.expect("message_tx should be Some after init_async_bridge")
.clone();
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();
let handle = self.command_executor.spawn(async move {
tokio::pin!(stream);
loop {
tokio::select! {
_ = cancel_clone.cancelled() => {
break; }
item = stream.next() => {
match item {
Some(msg) => {
if sender.send(Event::User(msg)).is_err() {
break; }
}
None => {
break; }
}
}
}
}
});
Subscription::new(handle, cancel_token)
}
pub fn spawn_cancellable<F, Fut, T>(&self, f: F) -> AsyncHandle<T>
where
F: FnOnce(tokio_util::sync::CancellationToken) -> Fut,
Fut: std::future::Future<Output = T> + Send + 'static,
T: Send + 'static,
{
use tokio_util::sync::CancellationToken;
let cancel_token = CancellationToken::new();
let token_clone = cancel_token.clone();
let handle = self.command_executor.spawn(f(token_clone));
AsyncHandle::new(handle, cancel_token)
}
pub fn spawn_cancellable_cmd<F, Fut>(&mut self, f: F) -> AsyncHandle<()>
where
F: FnOnce(tokio_util::sync::CancellationToken, mpsc::SyncSender<Event<M::Message>>) -> Fut,
Fut: std::future::Future<Output = ()> + Send + 'static,
M::Message: Send + 'static,
{
use tokio_util::sync::CancellationToken;
if self.message_tx.is_none() {
self.init_async_bridge();
}
let sender = self
.message_tx
.as_ref()
.expect("message_tx should be Some after init_async_bridge")
.clone();
let cancel_token = CancellationToken::new();
let token_clone = cancel_token.clone();
let handle = self.command_executor.spawn(f(token_clone, sender));
AsyncHandle::new(handle, cancel_token)
}
pub fn spawn<Fut>(&mut self, fut: Fut)
where
Fut: std::future::Future<Output = Option<M::Message>> + Send + 'static,
M::Message: Send + 'static,
{
if self.message_tx.is_none() {
self.init_async_bridge();
}
let sender = self
.message_tx
.as_ref()
.expect("message_tx should be Some after init_async_bridge")
.clone();
self.command_executor.spawn(async move {
if let Some(msg) = fut.await {
let _ = sender.send(Event::User(msg));
}
});
}
pub fn subscribe_interval<F>(&mut self, interval: Duration, mut callback: F) -> Subscription
where
F: FnMut() -> M::Message + Send + 'static,
M::Message: Send + 'static,
{
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
yield callback();
}
};
self.subscribe(stream)
}
pub fn subscribe_with_error<S, T, E, F, G>(
&mut self,
stream: S,
on_item: F,
on_error: G,
) -> Subscription
where
S: futures::Stream<Item = std::result::Result<T, E>> + Send + 'static,
F: Fn(T) -> M::Message + Send + 'static,
G: Fn(E) -> M::Message + Send + 'static,
M::Message: Send + 'static,
{
use futures::StreamExt;
let mapped_stream = stream.map(move |result| match result {
Ok(item) => on_item(item),
Err(error) => on_error(error),
});
self.subscribe(mapped_stream)
}
pub fn run_with_timeout(self, timeout: Duration) -> Result<()> {
let start = Instant::now();
self.run_internal(Some(timeout), Some(start), None)
}
pub fn run_until<F>(self, condition: F) -> Result<()>
where
F: FnMut(&M) -> bool + 'static,
{
self.run_with_condition(Some(Box::new(condition)))
}
pub fn run(self) -> Result<()> {
self.run_internal(None, None, None)
}
fn run_with_condition(self, condition: Option<ConditionCheck<M>>) -> Result<()> {
self.run_internal(None, None, condition)
}
fn run_internal(
mut self,
timeout: Option<Duration>,
start_time: Option<Instant>,
mut condition: Option<ConditionCheck<M>>,
) -> Result<()> {
self.running.store(true, Ordering::SeqCst);
let (crossterm_tx, crossterm_rx) = mpsc::sync_channel(100);
let (message_tx, message_rx) = if let Some(rx) = self.message_rx.take() {
let tx = self
.message_tx
.as_ref()
.expect("message_tx should be Some when message_rx is Some")
.clone();
(tx, rx)
} else {
let (tx, rx) = mpsc::sync_channel(100);
self.message_tx = Some(tx.clone());
(tx, rx)
};
if !self.options.headless && !self.options.without_renderer {
let running = Arc::clone(&self.running);
let force_quit = Arc::clone(&self.force_quit);
let input_thread = thread::spawn(move || loop {
if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
break;
}
if event::poll(Duration::from_millis(100)).unwrap_or(false) {
if let Ok(event) = event::read() {
let _ = crossterm_tx.send(event);
}
}
});
self.input_thread = Some(input_thread);
}
let init_cmd =
panic_recovery::safe_init(&mut self.model, self.options.panic_recovery_strategy);
if init_cmd.is_quit() {
self.running.store(false, Ordering::SeqCst);
self.terminal_manager.cleanup().map_err(Error::from)?;
return Ok(());
}
if !init_cmd.is_noop() {
self.command_executor.execute(init_cmd, &message_tx);
}
let tick_rate = Duration::from_millis(250);
loop {
if self.force_quit.load(Ordering::SeqCst) {
break;
}
if let (Some(timeout), Some(start)) = (timeout, start_time) {
if start.elapsed() >= timeout {
break; }
}
let event_timeout = if timeout.is_some() {
Duration::from_millis(10) } else {
tick_rate
};
let event = if self.options.headless {
self.priority_processor
.process_events_headless(&message_rx, event_timeout)
} else {
self.priority_processor
.process_events(&message_rx, &crossterm_rx, event_timeout)
};
if let Some(event) = event {
if matches!(event, Event::Quit) {
break;
}
let event = if let Some(ref filter) = self.filter {
filter(&self.model, event)
} else {
Some(event)
};
if let Some(event) = event {
let cmd = panic_recovery::safe_update(
&mut self.model,
event,
self.options.panic_recovery_strategy,
);
if cmd.is_quit() {
break;
}
if !cmd.is_noop() {
self.command_executor.execute(cmd, &message_tx);
}
}
}
if let Some(ref mut cond) = condition {
if cond(&self.model) {
break; }
}
if !self.options.without_renderer && self.fps_limiter.should_render() {
let (view_string, should_quit) =
panic_recovery::safe_view(&self.model, self.options.panic_recovery_strategy);
if should_quit {
break;
}
self.string_renderer
.render(view_string)
.map_err(Error::from)?;
self.fps_limiter.mark_rendered();
}
}
log::info!(
"Program shutting down. Final stats: {}",
self.event_stats_string()
);
self.running.store(false, Ordering::SeqCst);
self.terminal_manager.cleanup().map_err(Error::from)?;
Ok(())
}
}
impl<M: Model> Drop for Program<M> {
fn drop(&mut self) {
let _ = self.terminal_manager.cleanup();
self.running.store(false, Ordering::SeqCst);
self.force_quit.store(true, Ordering::SeqCst);
if let Some(thread) = self.input_thread.take() {
let _ = thread.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_program_options_all_methods() {
let options = ProgramOptions::default()
.with_mouse_mode(MouseMode::CellMotion)
.with_alt_screen(true)
.with_bracketed_paste(true)
.with_focus_reporting(true)
.with_fps(120)
.headless()
.without_signal_handler()
.without_renderer();
assert_eq!(options.mouse_mode, MouseMode::CellMotion);
assert!(options.alt_screen);
assert!(options.bracketed_paste);
assert!(options.focus_reporting);
assert_eq!(options.fps, 120);
assert!(options.headless);
assert!(!options.install_signal_handler);
assert!(options.without_renderer);
}
#[test]
fn test_mouse_mode_default() {
assert_eq!(MouseMode::default(), MouseMode::None);
}
#[test]
fn test_program_drop() {
use hojicha_core::core::Cmd;
struct TestModel;
impl Model for TestModel {
type Message = ();
fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
Cmd::noop()
}
fn view(&self) -> String {
"TestModel".to_string()
}
}
let options = ProgramOptions::default().headless();
{
let _program = Program::with_options(TestModel, options).unwrap();
}
}
#[test]
fn test_program_methods() {
use hojicha_core::core::Cmd;
struct TestModel;
impl Model for TestModel {
type Message = String;
fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
Cmd::noop()
}
fn view(&self) -> String {
"TestModel".to_string()
}
}
let options = ProgramOptions::default().headless();
let mut program = Program::with_options(TestModel, options).unwrap();
program.println("test");
program.printf(format_args!("test {}", 42));
program.quit();
program.kill();
let _ = program.release_terminal();
let _ = program.restore_terminal();
}
#[test]
fn test_program_with_filter() {
use hojicha_core::core::Cmd;
struct TestModel;
impl Model for TestModel {
type Message = i32;
fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
Cmd::noop()
}
fn view(&self) -> String {
"TestModel".to_string()
}
}
let options = ProgramOptions::default().headless();
let program = Program::with_options(TestModel, options).unwrap();
let _filtered = program.with_filter(|_, event| match event {
Event::User(n) if n > 5 => None,
_ => Some(event),
});
}
}