1mod command_executor;
5pub mod error_handler;
6mod event_processor;
7mod fps_limiter;
8mod priority_event_processor;
9mod terminal_manager;
10
11pub use command_executor::CommandExecutor;
12pub use event_processor::EventProcessor;
13pub use fps_limiter::FpsLimiter;
14pub use priority_event_processor::{
15 get_event_stats, EventStats, PriorityConfig, PriorityEventProcessor,
16};
17pub use terminal_manager::{TerminalConfig, TerminalManager};
18
19use crate::async_handle::AsyncHandle;
23use crate::subscription::Subscription;
24use crossterm::event::{self};
25use hojicha_core::core::Model;
26use hojicha_core::error::{Error, Result};
27use hojicha_core::event::Event;
28use std::io::{self, Read, Write};
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{mpsc, Arc};
31use std::thread;
32use std::time::{Duration, Instant};
33
34type MessageFilter<M> = Box<
36 dyn Fn(&M, Event<<M as Model>::Message>) -> Option<Event<<M as Model>::Message>> + Send + Sync,
37>;
38
39type ConditionCheck<M> = Box<dyn FnMut(&M) -> bool>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Default)]
44pub enum MouseMode {
45 #[default]
47 None,
48 CellMotion,
50 AllMotion,
52}
53
54pub struct ProgramOptions {
56 pub alt_screen: bool,
58 pub mouse_mode: MouseMode,
60 pub bracketed_paste: bool,
62 pub focus_reporting: bool,
64 pub fps: u16,
66 pub headless: bool,
68 pub install_signal_handler: bool,
70 pub without_renderer: bool,
72 pub output: Option<Box<dyn Write + Send + Sync>>,
74 pub input: Option<Box<dyn Read + Send + Sync>>,
76}
77
78impl ProgramOptions {
79 pub fn new() -> Self {
81 Self {
82 alt_screen: true,
83 mouse_mode: MouseMode::None,
84 bracketed_paste: false,
85 focus_reporting: false,
86 fps: 60,
87 headless: false,
88 install_signal_handler: true,
89 without_renderer: false,
90 output: None,
91 input: None,
92 }
93 }
94
95 pub fn with_alt_screen(mut self, enable: bool) -> Self {
97 self.alt_screen = enable;
98 self
99 }
100
101 pub fn with_mouse_mode(mut self, mode: MouseMode) -> Self {
103 self.mouse_mode = mode;
104 self
105 }
106
107 pub fn with_bracketed_paste(mut self, enable: bool) -> Self {
109 self.bracketed_paste = enable;
110 self
111 }
112
113 pub fn with_focus_reporting(mut self, enable: bool) -> Self {
115 self.focus_reporting = enable;
116 self
117 }
118
119 pub fn with_fps(mut self, fps: u16) -> Self {
121 self.fps = fps;
122 self
123 }
124
125 pub fn headless(mut self) -> Self {
127 self.headless = true;
128 self
129 }
130
131 pub fn without_signal_handler(mut self) -> Self {
133 self.install_signal_handler = false;
134 self
135 }
136
137 pub fn without_renderer(mut self) -> Self {
139 self.without_renderer = true;
140 self
141 }
142
143 pub fn with_output(mut self, output: Box<dyn Write + Send + Sync>) -> Self {
145 self.output = Some(output);
146 self
147 }
148
149 pub fn with_input(mut self, input: Box<dyn Read + Send + Sync>) -> Self {
151 self.input = Some(input);
152 self
153 }
154
155 pub fn with_input_string(mut self, input: &str) -> Self {
157 use std::io::Cursor;
158 self.input = Some(Box::new(Cursor::new(input.as_bytes().to_vec())));
159 self
160 }
161}
162
163impl Default for ProgramOptions {
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169pub struct Program<M: Model> {
171 model: M,
172 options: ProgramOptions,
173 terminal_manager: TerminalManager,
174 command_executor: CommandExecutor<M::Message>,
175 fps_limiter: FpsLimiter,
176 message_tx: Option<mpsc::SyncSender<Event<M::Message>>>,
177 message_rx: Option<mpsc::Receiver<Event<M::Message>>>,
178 priority_processor: PriorityEventProcessor<M::Message>,
179 filter: Option<MessageFilter<M>>,
180 running: Arc<AtomicBool>,
181 force_quit: Arc<AtomicBool>,
182 input_thread: Option<thread::JoinHandle<()>>,
183}
184
185impl<M: Model> Program<M>
186where
187 M::Message: Clone,
188{
189 pub fn new(model: M) -> Result<Self> {
191 Self::with_options(model, ProgramOptions::default())
192 }
193
194 pub fn with_options(model: M, options: ProgramOptions) -> Result<Self> {
196 let terminal_config = TerminalConfig {
198 alt_screen: options.alt_screen,
199 mouse_mode: options.mouse_mode,
200 bracketed_paste: options.bracketed_paste,
201 focus_reporting: options.focus_reporting,
202 headless: options.headless || options.without_renderer,
203 };
204 let terminal_manager = TerminalManager::new(terminal_config)?;
205
206 let command_executor = CommandExecutor::new()?;
208
209 let fps_limiter = FpsLimiter::new(options.fps);
211
212 let priority_processor = PriorityEventProcessor::new();
214
215 log::info!("Hojicha program initialized with priority event processing");
216
217 Ok(Self {
218 model,
219 options,
220 terminal_manager,
221 command_executor,
222 fps_limiter,
223 message_tx: None,
224 message_rx: None,
225 priority_processor,
226 filter: None,
227 running: Arc::new(AtomicBool::new(false)),
228 force_quit: Arc::new(AtomicBool::new(false)),
229 input_thread: None,
230 })
231 }
232
233 pub fn with_filter<F>(mut self, filter: F) -> Self
235 where
236 F: Fn(&M, Event<M::Message>) -> Option<Event<M::Message>> + Send + Sync + 'static,
237 {
238 self.filter = Some(Box::new(filter));
239 self
240 }
241
242 pub fn with_priority_config(mut self, config: PriorityConfig) -> Self {
244 self.priority_processor = PriorityEventProcessor::with_config(config);
245 log::debug!("Priority processor configured with custom settings");
246 self
247 }
248
249 pub fn event_stats(&self) -> EventStats {
251 self.priority_processor.stats()
252 }
253
254 pub fn event_stats_string(&self) -> String {
256 get_event_stats(&self.priority_processor)
257 }
258
259 pub fn printf(&self, args: std::fmt::Arguments) {
261 eprint!("{args}");
262 let _ = io::stderr().flush();
263 }
264
265 pub fn println(&self, text: &str) {
267 eprintln!("{text}");
268 let _ = io::stderr().flush();
269 }
270
271 pub fn quit(&self) {
273 if let Some(tx) = &self.message_tx {
274 let _ = tx.send(Event::Quit);
275 }
276 }
277
278 pub fn kill(&self) {
298 self.force_quit.store(true, Ordering::SeqCst);
299 self.running.store(false, Ordering::SeqCst);
300 }
301
302 pub fn metrics(&self) -> crate::metrics::AdvancedEventStats {
304 self.priority_processor.advanced_metrics()
305 }
306
307 pub fn metrics_json(&self) -> String {
309 self.priority_processor.metrics_collector().export_json()
310 }
311
312 pub fn metrics_prometheus(&self) -> String {
314 self.priority_processor
315 .metrics_collector()
316 .export_prometheus()
317 }
318
319 pub fn metrics_text(&self) -> String {
321 self.priority_processor.metrics_collector().export_text()
322 }
323
324 pub fn metrics_dashboard(&self) -> String {
326 let stats = self.priority_processor.advanced_metrics();
327 crate::metrics::display_dashboard(&stats)
328 }
329
330 pub fn reset_metrics(&self) {
332 self.priority_processor.reset_stats();
333 }
334
335 pub fn resize_queue(&self, new_size: usize) -> Result<()> {
337 self.priority_processor
338 .resize_queue(new_size)
339 .map_err(|e| Error::Custom(Box::new(e)))
340 }
341
342 pub fn queue_capacity(&self) -> usize {
344 self.priority_processor.queue_capacity()
345 }
346
347 pub fn enable_auto_scaling(
349 &mut self,
350 config: crate::queue_scaling::AutoScaleConfig,
351 ) -> &mut Self {
352 self.priority_processor.enable_auto_scaling(config);
353 self
354 }
355
356 pub fn with_auto_scaling(mut self) -> Self {
358 self.priority_processor
359 .enable_auto_scaling(crate::queue_scaling::AutoScaleConfig::default());
360 self
361 }
362
363 pub fn disable_auto_scaling(&mut self) -> &mut Self {
365 self.priority_processor.disable_auto_scaling();
366 self
367 }
368
369 pub fn sender(&self) -> Option<mpsc::SyncSender<Event<M::Message>>> {
389 self.message_tx.clone()
390 }
391
392 pub fn send_message(&self, msg: M::Message) -> Result<()> {
401 self.message_tx
402 .as_ref()
403 .ok_or_else(|| {
404 Error::from(io::Error::new(
405 io::ErrorKind::NotConnected,
406 "Program not running",
407 ))
408 })?
409 .send(Event::User(msg))
410 .map_err(|_| {
411 Error::from(io::Error::new(
412 io::ErrorKind::BrokenPipe,
413 "Receiver disconnected",
414 ))
415 })
416 }
417
418 pub fn wait(&self) {
420 while !self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
421 thread::sleep(Duration::from_millis(1));
422 }
423 while self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
424 thread::sleep(Duration::from_millis(10));
425 }
426 }
427
428 pub fn release_terminal(&mut self) -> Result<()> {
430 self.terminal_manager.release().map_err(Error::from)
431 }
432
433 pub fn restore_terminal(&mut self) -> Result<()> {
435 self.terminal_manager.restore().map_err(Error::from)
436 }
437
438 pub fn init_async_bridge(&mut self) -> mpsc::SyncSender<Event<M::Message>> {
484 if self.message_tx.is_none() {
485 let (message_tx, message_rx) = mpsc::sync_channel(100);
486 self.message_tx = Some(message_tx.clone());
487 self.message_rx = Some(message_rx);
488 message_tx
489 } else {
490 self.message_tx
492 .as_ref()
493 .expect("message_tx should be Some after init check")
494 .clone()
495 }
496 }
497
498 pub fn subscribe<S>(&mut self, stream: S) -> Subscription
545 where
546 S: futures::Stream<Item = M::Message> + Send + 'static,
547 M::Message: Send + 'static,
548 {
549 use futures::StreamExt;
550 use tokio_util::sync::CancellationToken;
551
552 if self.message_tx.is_none() {
554 self.init_async_bridge();
555 }
556
557 let sender = self
558 .message_tx
559 .as_ref()
560 .expect("message_tx should be Some after init_async_bridge")
561 .clone();
562 let cancel_token = CancellationToken::new();
563 let cancel_clone = cancel_token.clone();
564
565 let handle = self.command_executor.spawn(async move {
567 tokio::pin!(stream);
568
569 loop {
570 tokio::select! {
571 _ = cancel_clone.cancelled() => {
572 break; }
574 item = stream.next() => {
575 match item {
576 Some(msg) => {
577 if sender.send(Event::User(msg)).is_err() {
578 break; }
580 }
581 None => {
582 break; }
584 }
585 }
586 }
587 }
588 });
589
590 Subscription::new(handle, cancel_token)
591 }
592
593 pub fn spawn_cancellable<F, Fut, T>(&self, f: F) -> AsyncHandle<T>
647 where
648 F: FnOnce(tokio_util::sync::CancellationToken) -> Fut,
649 Fut: std::future::Future<Output = T> + Send + 'static,
650 T: Send + 'static,
651 {
652 use tokio_util::sync::CancellationToken;
653
654 let cancel_token = CancellationToken::new();
655 let token_clone = cancel_token.clone();
656
657 let handle = self.command_executor.spawn(f(token_clone));
658
659 AsyncHandle::new(handle, cancel_token)
660 }
661
662 pub fn spawn_cancellable_cmd<F, Fut>(&mut self, f: F) -> AsyncHandle<()>
678 where
679 F: FnOnce(tokio_util::sync::CancellationToken, mpsc::SyncSender<Event<M::Message>>) -> Fut,
680 Fut: std::future::Future<Output = ()> + Send + 'static,
681 M::Message: Send + 'static,
682 {
683 use tokio_util::sync::CancellationToken;
684
685 if self.message_tx.is_none() {
687 self.init_async_bridge();
688 }
689
690 let sender = self
691 .message_tx
692 .as_ref()
693 .expect("message_tx should be Some after init_async_bridge")
694 .clone();
695 let cancel_token = CancellationToken::new();
696 let token_clone = cancel_token.clone();
697
698 let handle = self.command_executor.spawn(f(token_clone, sender));
699
700 AsyncHandle::new(handle, cancel_token)
701 }
702
703 pub fn spawn<Fut>(&mut self, fut: Fut)
716 where
717 Fut: std::future::Future<Output = Option<M::Message>> + Send + 'static,
718 M::Message: Send + 'static,
719 {
720 if self.message_tx.is_none() {
722 self.init_async_bridge();
723 }
724
725 let sender = self
726 .message_tx
727 .as_ref()
728 .expect("message_tx should be Some after init_async_bridge")
729 .clone();
730
731 self.command_executor.spawn(async move {
732 if let Some(msg) = fut.await {
733 let _ = sender.send(Event::User(msg));
734 }
735 });
736 }
737
738 pub fn subscribe_interval<F>(&mut self, interval: Duration, mut callback: F) -> Subscription
747 where
748 F: FnMut() -> M::Message + Send + 'static,
749 M::Message: Send + 'static,
750 {
751 use futures::stream::StreamExt;
752
753 let stream = async_stream::stream! {
754 let mut interval = tokio::time::interval(interval);
755 loop {
756 interval.tick().await;
757 yield callback();
758 }
759 };
760
761 self.subscribe(stream)
762 }
763
764 pub fn subscribe_with_error<S, T, E, F, G>(
777 &mut self,
778 stream: S,
779 on_item: F,
780 on_error: G,
781 ) -> Subscription
782 where
783 S: futures::Stream<Item = std::result::Result<T, E>> + Send + 'static,
784 F: Fn(T) -> M::Message + Send + 'static,
785 G: Fn(E) -> M::Message + Send + 'static,
786 M::Message: Send + 'static,
787 {
788 use futures::StreamExt;
789
790 let mapped_stream = stream.map(move |result| match result {
791 Ok(item) => on_item(item),
792 Err(error) => on_error(error),
793 });
794
795 self.subscribe(mapped_stream)
796 }
797
798 pub fn run_with_timeout(self, timeout: Duration) -> Result<()> {
803 let start = Instant::now();
804 self.run_internal(Some(timeout), Some(start), None)
805 }
806
807 pub fn run_until<F>(self, condition: F) -> Result<()>
812 where
813 F: FnMut(&M) -> bool + 'static,
814 {
815 self.run_with_condition(Some(Box::new(condition)))
816 }
817
818 pub fn run(self) -> Result<()> {
820 self.run_internal(None, None, None)
821 }
822
823 fn run_with_condition(self, condition: Option<ConditionCheck<M>>) -> Result<()> {
825 self.run_internal(None, None, condition)
826 }
827
828 fn run_internal(
830 mut self,
831 timeout: Option<Duration>,
832 start_time: Option<Instant>,
833 mut condition: Option<ConditionCheck<M>>,
834 ) -> Result<()> {
835 self.running.store(true, Ordering::SeqCst);
837
838 let (crossterm_tx, crossterm_rx) = mpsc::sync_channel(100);
839
840 let (message_tx, message_rx) = if let Some(rx) = self.message_rx.take() {
842 let tx = self
844 .message_tx
845 .as_ref()
846 .expect("message_tx should be Some when message_rx is Some")
847 .clone();
848 (tx, rx)
849 } else {
850 let (tx, rx) = mpsc::sync_channel(100);
851 self.message_tx = Some(tx.clone());
852 (tx, rx)
853 };
854
855 if !self.options.headless && !self.options.without_renderer {
857 let running = Arc::clone(&self.running);
858 let force_quit = Arc::clone(&self.force_quit);
859
860 let input_thread = thread::spawn(move || loop {
861 if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
862 break;
863 }
864
865 if event::poll(Duration::from_millis(100)).unwrap_or(false) {
866 if let Ok(event) = event::read() {
867 let _ = crossterm_tx.send(event);
868 }
869 }
870 });
871 self.input_thread = Some(input_thread);
872 }
873
874 let init_cmd = self.model.init();
876 if !init_cmd.is_noop() {
877 self.command_executor.execute(init_cmd, message_tx.clone());
878 }
879
880 let tick_rate = Duration::from_millis(250);
882 loop {
883 if self.force_quit.load(Ordering::SeqCst) {
884 break;
885 }
886
887 if let (Some(timeout), Some(start)) = (timeout, start_time) {
889 if start.elapsed() >= timeout {
890 break; }
892 }
893
894 let event_timeout = if timeout.is_some() {
896 Duration::from_millis(10) } else {
898 tick_rate
899 };
900 let event = if self.options.headless {
901 self.priority_processor
902 .process_events_headless(&message_rx, event_timeout)
903 } else {
904 self.priority_processor
905 .process_events(&message_rx, &crossterm_rx, event_timeout)
906 };
907
908 if let Some(event) = event {
909 if matches!(event, Event::Quit) {
911 break;
912 }
913
914 let event = if let Some(ref filter) = self.filter {
916 filter(&self.model, event)
917 } else {
918 Some(event)
919 };
920
921 if let Some(event) = event {
923 let cmd = self.model.update(event);
924
925 if cmd.is_quit() {
927 break;
928 }
929
930 if !cmd.is_noop() {
932 self.command_executor.execute(cmd, message_tx.clone());
933 }
934 }
935 }
936
937 if let Some(ref mut cond) = condition {
939 if cond(&self.model) {
940 break; }
942 }
943
944 if !self.options.without_renderer && self.fps_limiter.should_render() {
946 self.terminal_manager
947 .draw(|f| {
948 let area = f.area();
949 self.model.view(f, area);
950 })
951 .map_err(Error::from)?;
952 self.fps_limiter.mark_rendered();
953 }
954 }
955
956 log::info!(
958 "Program shutting down. Final stats: {}",
959 self.event_stats_string()
960 );
961
962 self.running.store(false, Ordering::SeqCst);
964 self.terminal_manager.cleanup().map_err(Error::from)?;
965
966 Ok(())
967 }
968}
969
970impl<M: Model> Drop for Program<M> {
971 fn drop(&mut self) {
972 let _ = self.terminal_manager.cleanup();
973
974 self.running.store(false, Ordering::SeqCst);
976 self.force_quit.store(true, Ordering::SeqCst);
977
978 if let Some(thread) = self.input_thread.take() {
979 let _ = thread.join();
980 }
981 }
982}
983
984#[cfg(test)]
985mod tests {
986 use super::*;
987
988 #[test]
989 fn test_program_options_all_methods() {
990 let options = ProgramOptions::default()
991 .with_mouse_mode(MouseMode::CellMotion)
992 .with_alt_screen(true)
993 .with_bracketed_paste(true)
994 .with_focus_reporting(true)
995 .with_fps(120)
996 .headless()
997 .without_signal_handler()
998 .without_renderer();
999
1000 assert_eq!(options.mouse_mode, MouseMode::CellMotion);
1001 assert!(options.alt_screen);
1002 assert!(options.bracketed_paste);
1003 assert!(options.focus_reporting);
1004 assert_eq!(options.fps, 120);
1005 assert!(options.headless);
1006 assert!(!options.install_signal_handler);
1007 assert!(options.without_renderer);
1008 }
1009
1010 #[test]
1011 fn test_mouse_mode_default() {
1012 assert_eq!(MouseMode::default(), MouseMode::None);
1013 }
1014
1015 #[test]
1016 fn test_program_drop() {
1017 use hojicha_core::core::Cmd;
1018
1019 struct TestModel;
1020 impl Model for TestModel {
1021 type Message = ();
1022 fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1023 Cmd::none()
1024 }
1025 fn view(&self, _: &mut ratatui::Frame, _: ratatui::layout::Rect) {}
1026 }
1027
1028 let options = ProgramOptions::default().headless();
1029 {
1030 let _program = Program::with_options(TestModel, options).unwrap();
1031 }
1033 }
1034
1035 #[test]
1036 fn test_program_methods() {
1037 use hojicha_core::core::Cmd;
1038
1039 struct TestModel;
1040 impl Model for TestModel {
1041 type Message = String;
1042 fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1043 Cmd::none()
1044 }
1045 fn view(&self, _: &mut ratatui::Frame, _: ratatui::layout::Rect) {}
1046 }
1047
1048 let options = ProgramOptions::default().headless();
1049 let mut program = Program::with_options(TestModel, options).unwrap();
1050
1051 program.println("test");
1053 program.printf(format_args!("test {}", 42));
1054
1055 program.quit();
1057 program.kill();
1058
1059 let _ = program.release_terminal();
1061 let _ = program.restore_terminal();
1062 }
1063
1064 #[test]
1065 fn test_program_with_filter() {
1066 use hojicha_core::core::Cmd;
1067
1068 struct TestModel;
1069 impl Model for TestModel {
1070 type Message = i32;
1071 fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1072 Cmd::none()
1073 }
1074 fn view(&self, _: &mut ratatui::Frame, _: ratatui::layout::Rect) {}
1075 }
1076
1077 let options = ProgramOptions::default().headless();
1078 let program = Program::with_options(TestModel, options).unwrap();
1079
1080 let _filtered = program.with_filter(|_, event| match event {
1081 Event::User(n) if n > 5 => None,
1082 _ => Some(event),
1083 });
1084 }
1085}