hojicha_runtime/
program.rs

1//! The main Program struct that runs the application
2
3// Module components
4mod command_executor;
5pub mod error_handler;
6mod event_processor;
7mod fps_limiter;
8mod priority_event_processor;
9mod terminal_manager;
10
11pub use command_executor::CommandExecutor;
12pub use event_processor::EventProcessor;
13pub use fps_limiter::FpsLimiter;
14pub use priority_event_processor::{
15    get_event_stats, EventStats, PriorityConfig, PriorityEventProcessor,
16};
17pub use terminal_manager::{TerminalConfig, TerminalManager};
18
19// Re-export the main types from program_old.rs for backward compatibility
20// We'll gradually migrate the implementation to use the extracted components
21
22use crate::async_handle::AsyncHandle;
23use crate::subscription::Subscription;
24use crossterm::event::{self};
25use hojicha_core::core::Model;
26use hojicha_core::error::{Error, Result};
27use hojicha_core::event::Event;
28use std::io::{self, Read, Write};
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{mpsc, Arc};
31use std::thread;
32use std::time::{Duration, Instant};
33
34/// Type alias for message filter function
35type MessageFilter<M> = Box<
36    dyn Fn(&M, Event<<M as Model>::Message>) -> Option<Event<<M as Model>::Message>> + Send + Sync,
37>;
38
39/// Type alias for condition check function
40type ConditionCheck<M> = Box<dyn FnMut(&M) -> bool>;
41
42/// Mouse tracking mode
43#[derive(Debug, Clone, Copy, PartialEq, Default)]
44pub enum MouseMode {
45    /// No mouse tracking
46    #[default]
47    None,
48    /// Track mouse events only when buttons are pressed
49    CellMotion,
50    /// Track all mouse movement, even without button presses
51    AllMotion,
52}
53
54/// Options for configuring the program
55pub struct ProgramOptions {
56    /// Whether to use alternate screen
57    pub alt_screen: bool,
58    /// Mouse tracking mode
59    pub mouse_mode: MouseMode,
60    /// Enable bracketed paste mode
61    pub bracketed_paste: bool,
62    /// Enable focus reporting
63    pub focus_reporting: bool,
64    /// Frames per second (0 = unlimited)
65    pub fps: u16,
66    /// Run in headless mode without rendering
67    pub headless: bool,
68    /// Disable signal handlers
69    pub install_signal_handler: bool,
70    /// Disable renderer
71    pub without_renderer: bool,
72    /// Custom output writer
73    pub output: Option<Box<dyn Write + Send + Sync>>,
74    /// Custom input reader
75    pub input: Option<Box<dyn Read + Send + Sync>>,
76}
77
78impl ProgramOptions {
79    /// Create new default options
80    pub fn new() -> Self {
81        Self {
82            alt_screen: true,
83            mouse_mode: MouseMode::None,
84            bracketed_paste: false,
85            focus_reporting: false,
86            fps: 60,
87            headless: false,
88            install_signal_handler: true,
89            without_renderer: false,
90            output: None,
91            input: None,
92        }
93    }
94
95    /// Enable or disable alternate screen
96    pub fn with_alt_screen(mut self, enable: bool) -> Self {
97        self.alt_screen = enable;
98        self
99    }
100
101    /// Set mouse tracking mode
102    pub fn with_mouse_mode(mut self, mode: MouseMode) -> Self {
103        self.mouse_mode = mode;
104        self
105    }
106
107    /// Enable bracketed paste mode
108    pub fn with_bracketed_paste(mut self, enable: bool) -> Self {
109        self.bracketed_paste = enable;
110        self
111    }
112
113    /// Enable focus reporting
114    pub fn with_focus_reporting(mut self, enable: bool) -> Self {
115        self.focus_reporting = enable;
116        self
117    }
118
119    /// Set frames per second
120    pub fn with_fps(mut self, fps: u16) -> Self {
121        self.fps = fps;
122        self
123    }
124
125    /// Run in headless mode
126    pub fn headless(mut self) -> Self {
127        self.headless = true;
128        self
129    }
130
131    /// Disable signal handlers
132    pub fn without_signal_handler(mut self) -> Self {
133        self.install_signal_handler = false;
134        self
135    }
136
137    /// Disable renderer
138    pub fn without_renderer(mut self) -> Self {
139        self.without_renderer = true;
140        self
141    }
142
143    /// Set custom output
144    pub fn with_output(mut self, output: Box<dyn Write + Send + Sync>) -> Self {
145        self.output = Some(output);
146        self
147    }
148
149    /// Set custom input source
150    pub fn with_input(mut self, input: Box<dyn Read + Send + Sync>) -> Self {
151        self.input = Some(input);
152        self
153    }
154
155    /// Set input from a string (convenience method for testing)
156    pub fn with_input_string(mut self, input: &str) -> Self {
157        use std::io::Cursor;
158        self.input = Some(Box::new(Cursor::new(input.as_bytes().to_vec())));
159        self
160    }
161}
162
163impl Default for ProgramOptions {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169/// The main program that runs your application
170pub struct Program<M: Model> {
171    model: M,
172    options: ProgramOptions,
173    terminal_manager: TerminalManager,
174    command_executor: CommandExecutor<M::Message>,
175    fps_limiter: FpsLimiter,
176    message_tx: Option<mpsc::SyncSender<Event<M::Message>>>,
177    message_rx: Option<mpsc::Receiver<Event<M::Message>>>,
178    priority_processor: PriorityEventProcessor<M::Message>,
179    filter: Option<MessageFilter<M>>,
180    running: Arc<AtomicBool>,
181    force_quit: Arc<AtomicBool>,
182    input_thread: Option<thread::JoinHandle<()>>,
183}
184
185impl<M: Model> Program<M>
186where
187    M::Message: Clone,
188{
189    /// Create a new program with the given model
190    pub fn new(model: M) -> Result<Self> {
191        Self::with_options(model, ProgramOptions::default())
192    }
193
194    /// Create a new program with custom options
195    pub fn with_options(model: M, options: ProgramOptions) -> Result<Self> {
196        // Create terminal manager
197        let terminal_config = TerminalConfig {
198            alt_screen: options.alt_screen,
199            mouse_mode: options.mouse_mode,
200            bracketed_paste: options.bracketed_paste,
201            focus_reporting: options.focus_reporting,
202            headless: options.headless || options.without_renderer,
203        };
204        let terminal_manager = TerminalManager::new(terminal_config)?;
205
206        // Create command executor
207        let command_executor = CommandExecutor::new()?;
208
209        // Create FPS limiter
210        let fps_limiter = FpsLimiter::new(options.fps);
211
212        // Create priority event processor with default config
213        let priority_processor = PriorityEventProcessor::new();
214
215        log::info!("Hojicha program initialized with priority event processing");
216
217        Ok(Self {
218            model,
219            options,
220            terminal_manager,
221            command_executor,
222            fps_limiter,
223            message_tx: None,
224            message_rx: None,
225            priority_processor,
226            filter: None,
227            running: Arc::new(AtomicBool::new(false)),
228            force_quit: Arc::new(AtomicBool::new(false)),
229            input_thread: None,
230        })
231    }
232
233    /// Set a message filter function
234    pub fn with_filter<F>(mut self, filter: F) -> Self
235    where
236        F: Fn(&M, Event<M::Message>) -> Option<Event<M::Message>> + Send + Sync + 'static,
237    {
238        self.filter = Some(Box::new(filter));
239        self
240    }
241
242    /// Configure the priority event processor
243    pub fn with_priority_config(mut self, config: PriorityConfig) -> Self {
244        self.priority_processor = PriorityEventProcessor::with_config(config);
245        log::debug!("Priority processor configured with custom settings");
246        self
247    }
248
249    /// Get current event processing statistics
250    pub fn event_stats(&self) -> EventStats {
251        self.priority_processor.stats()
252    }
253
254    /// Get a formatted string of event statistics (useful for debugging)
255    pub fn event_stats_string(&self) -> String {
256        get_event_stats(&self.priority_processor)
257    }
258
259    /// Print formatted text to stderr
260    pub fn printf(&self, args: std::fmt::Arguments) {
261        eprint!("{args}");
262        let _ = io::stderr().flush();
263    }
264
265    /// Print a line to stderr
266    pub fn println(&self, text: &str) {
267        eprintln!("{text}");
268        let _ = io::stderr().flush();
269    }
270
271    /// Send a quit message to the running program
272    pub fn quit(&self) {
273        if let Some(tx) = &self.message_tx {
274            let _ = tx.send(Event::Quit);
275        }
276    }
277
278    /// Force kill the program immediately
279    ///
280    /// This method forcefully terminates the program without any cleanup.
281    /// Unlike `quit()` which sends a quit message through the event loop,
282    /// `kill()` immediately stops execution.
283    ///
284    /// # Use Cases
285    /// - Emergency shutdown when the program is unresponsive
286    /// - Testing scenarios requiring immediate termination
287    /// - Signal handlers that need to stop the program immediately
288    ///
289    /// # Example
290    /// ```ignore
291    /// // In a signal handler or emergency shutdown
292    /// program.kill();
293    /// ```
294    ///
295    /// # Note
296    /// This bypasses normal cleanup procedures. Prefer `quit()` for graceful shutdown.
297    pub fn kill(&self) {
298        self.force_quit.store(true, Ordering::SeqCst);
299        self.running.store(false, Ordering::SeqCst);
300    }
301
302    /// Get advanced performance metrics
303    pub fn metrics(&self) -> crate::metrics::AdvancedEventStats {
304        self.priority_processor.advanced_metrics()
305    }
306
307    /// Export metrics in JSON format
308    pub fn metrics_json(&self) -> String {
309        self.priority_processor.metrics_collector().export_json()
310    }
311
312    /// Export metrics in Prometheus format
313    pub fn metrics_prometheus(&self) -> String {
314        self.priority_processor
315            .metrics_collector()
316            .export_prometheus()
317    }
318
319    /// Export metrics in plain text format
320    pub fn metrics_text(&self) -> String {
321        self.priority_processor.metrics_collector().export_text()
322    }
323
324    /// Display metrics dashboard (for debugging)
325    pub fn metrics_dashboard(&self) -> String {
326        let stats = self.priority_processor.advanced_metrics();
327        crate::metrics::display_dashboard(&stats)
328    }
329
330    /// Reset all metrics
331    pub fn reset_metrics(&self) {
332        self.priority_processor.reset_stats();
333    }
334
335    /// Dynamically resize the event queue
336    pub fn resize_queue(&self, new_size: usize) -> Result<()> {
337        self.priority_processor
338            .resize_queue(new_size)
339            .map_err(|e| Error::Custom(Box::new(e)))
340    }
341
342    /// Get current queue capacity
343    pub fn queue_capacity(&self) -> usize {
344        self.priority_processor.queue_capacity()
345    }
346
347    /// Enable auto-scaling with the specified configuration
348    pub fn enable_auto_scaling(
349        &mut self,
350        config: crate::queue_scaling::AutoScaleConfig,
351    ) -> &mut Self {
352        self.priority_processor.enable_auto_scaling(config);
353        self
354    }
355
356    /// Enable auto-scaling with default configuration
357    pub fn with_auto_scaling(mut self) -> Self {
358        self.priority_processor
359            .enable_auto_scaling(crate::queue_scaling::AutoScaleConfig::default());
360        self
361    }
362
363    /// Disable auto-scaling
364    pub fn disable_auto_scaling(&mut self) -> &mut Self {
365        self.priority_processor.disable_auto_scaling();
366        self
367    }
368
369    /// Get a sender for injecting events from external sources
370    ///
371    /// This allows external async tasks to send messages to the program's event loop.
372    /// The sender is thread-safe and can be cloned for use in multiple threads.
373    ///
374    /// # Example
375    /// ```ignore
376    /// let mut program = Program::new(model)?;
377    /// let sender = program.sender();
378    ///
379    /// std::thread::spawn(move || {
380    ///     loop {
381    ///         std::thread::sleep(Duration::from_secs(1));
382    ///         let _ = sender.send(Event::User(Msg::Tick));
383    ///     }
384    /// });
385    ///
386    /// program.run()?;
387    /// ```
388    pub fn sender(&self) -> Option<mpsc::SyncSender<Event<M::Message>>> {
389        self.message_tx.clone()
390    }
391
392    /// Send a user message to the program
393    ///
394    /// Convenience method that wraps the message in Event::User.
395    ///
396    /// # Example
397    /// ```ignore
398    /// program.send_message(Msg::DataLoaded(data))?;
399    /// ```
400    pub fn send_message(&self, msg: M::Message) -> Result<()> {
401        self.message_tx
402            .as_ref()
403            .ok_or_else(|| {
404                Error::from(io::Error::new(
405                    io::ErrorKind::NotConnected,
406                    "Program not running",
407                ))
408            })?
409            .send(Event::User(msg))
410            .map_err(|_| {
411                Error::from(io::Error::new(
412                    io::ErrorKind::BrokenPipe,
413                    "Receiver disconnected",
414                ))
415            })
416    }
417
418    /// Wait for the program to finish
419    pub fn wait(&self) {
420        while !self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
421            thread::sleep(Duration::from_millis(1));
422        }
423        while self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
424            thread::sleep(Duration::from_millis(10));
425        }
426    }
427
428    /// Release the terminal
429    pub fn release_terminal(&mut self) -> Result<()> {
430        self.terminal_manager.release().map_err(Error::from)
431    }
432
433    /// Restore the terminal
434    pub fn restore_terminal(&mut self) -> Result<()> {
435        self.terminal_manager.restore().map_err(Error::from)
436    }
437
438    /// Initialize the async message bridge for external event injection
439    ///
440    /// This sets up channels for external systems to send messages into the program's
441    /// event loop. Must be called before `run()` if you need external message injection.
442    ///
443    /// # Use Cases
444    ///
445    /// - **WebSocket Integration**: Forward messages from WebSocket connections
446    /// - **File Watchers**: Send events when files change
447    /// - **Timers**: Implement custom scheduling beyond tick/every
448    /// - **Database Listeners**: Forward change notifications
449    /// - **IPC**: Receive messages from other processes
450    /// - **HTTP Responses**: Send results from async HTTP requests
451    ///
452    /// # Thread Safety
453    ///
454    /// The returned sender can be cloned and shared across multiple threads safely.
455    /// Messages are queued with a capacity of 100 by default.
456    ///
457    /// # Example
458    ///
459    /// ```ignore
460    /// let mut program = Program::new(model)?;
461    /// let sender = program.init_async_bridge();
462    ///
463    /// // WebSocket integration example
464    /// let ws_sender = sender.clone();
465    /// tokio::spawn(async move {
466    ///     let mut ws = connect_websocket().await;
467    ///     while let Some(msg) = ws.recv().await {
468    ///         ws_sender.send(Event::User(Msg::WsMessage(msg))).ok();
469    ///     }
470    /// });
471    ///
472    /// // File watcher example
473    /// let fs_sender = sender.clone();
474    /// thread::spawn(move || {
475    ///     let mut watcher = FileWatcher::new();
476    ///     for event in watcher.events() {
477    ///         fs_sender.send(Event::User(Msg::FileChanged(event))).ok();
478    ///     }
479    /// });
480    ///
481    /// program.run()?;
482    /// ```
483    pub fn init_async_bridge(&mut self) -> mpsc::SyncSender<Event<M::Message>> {
484        if self.message_tx.is_none() {
485            let (message_tx, message_rx) = mpsc::sync_channel(100);
486            self.message_tx = Some(message_tx.clone());
487            self.message_rx = Some(message_rx);
488            message_tx
489        } else {
490            // Safe to unwrap here because we just checked is_none() above
491            self.message_tx
492                .as_ref()
493                .expect("message_tx should be Some after init check")
494                .clone()
495        }
496    }
497
498    /// Subscribe to an async stream of events
499    ///
500    /// Connects any `futures::Stream` to your program's event loop. Each item from
501    /// the stream is automatically wrapped in `Event::User` and sent to your model's
502    /// update function.
503    ///
504    /// # Use Cases
505    ///
506    /// - **WebSocket/SSE**: Stream real-time messages
507    /// - **File Watching**: Monitor file system changes
508    /// - **Periodic Tasks**: Custom intervals and scheduling
509    /// - **Database Changes**: Listen to change streams
510    /// - **Sensor Data**: Process continuous data streams
511    ///
512    /// # Cancellation
513    ///
514    /// The returned `Subscription` handle allows graceful cancellation. The subscription
515    /// is also automatically cancelled when dropped.
516    ///
517    /// # Examples
518    ///
519    /// ## Timer Stream
520    /// ```ignore
521    /// use tokio_stream::wrappers::IntervalStream;
522    /// use std::time::Duration;
523    ///
524    /// let interval = tokio::time::interval(Duration::from_secs(1));
525    /// let stream = IntervalStream::new(interval)
526    ///     .map(|_| Msg::Tick);
527    /// let subscription = program.subscribe(stream);
528    /// ```
529    ///
530    /// ## WebSocket Stream
531    /// ```ignore
532    /// let ws_stream = websocket.messages()
533    ///     .filter_map(|msg| msg.ok())
534    ///     .map(|msg| Msg::WebSocketMessage(msg));
535    /// let subscription = program.subscribe(ws_stream);
536    /// ```
537    ///
538    /// ## File Watcher Stream
539    /// ```ignore
540    /// let file_stream = watch_file("/path/to/file")
541    ///     .map(|event| Msg::FileChanged(event));
542    /// let subscription = program.subscribe(file_stream);
543    /// ```
544    pub fn subscribe<S>(&mut self, stream: S) -> Subscription
545    where
546        S: futures::Stream<Item = M::Message> + Send + 'static,
547        M::Message: Send + 'static,
548    {
549        use futures::StreamExt;
550        use tokio_util::sync::CancellationToken;
551
552        // Ensure we have a message channel
553        if self.message_tx.is_none() {
554            self.init_async_bridge();
555        }
556
557        let sender = self
558            .message_tx
559            .as_ref()
560            .expect("message_tx should be Some after init_async_bridge")
561            .clone();
562        let cancel_token = CancellationToken::new();
563        let cancel_clone = cancel_token.clone();
564
565        // Spawn task to poll the stream
566        let handle = self.command_executor.spawn(async move {
567            tokio::pin!(stream);
568
569            loop {
570                tokio::select! {
571                    _ = cancel_clone.cancelled() => {
572                        break; // Subscription was cancelled
573                    }
574                    item = stream.next() => {
575                        match item {
576                            Some(msg) => {
577                                if sender.send(Event::User(msg)).is_err() {
578                                    break; // Program has shut down
579                                }
580                            }
581                            None => {
582                                break; // Stream completed
583                            }
584                        }
585                    }
586                }
587            }
588        });
589
590        Subscription::new(handle, cancel_token)
591    }
592
593    /// Spawn a cancellable async operation
594    ///
595    /// Spawns a long-running async task with cooperative cancellation support.
596    /// The task receives a `CancellationToken` for checking cancellation status.
597    ///
598    /// # Use Cases
599    ///
600    /// - **Background Processing**: Data analysis, file processing
601    /// - **Network Operations**: Long polling, streaming downloads
602    /// - **Periodic Tasks**: Health checks, metrics collection
603    /// - **Resource Monitoring**: CPU/memory monitoring
604    /// - **Cleanup Tasks**: Temporary file cleanup, cache management
605    ///
606    /// # Cancellation Pattern
607    ///
608    /// Tasks should periodically check the cancellation token and exit gracefully
609    /// when cancelled. Use `tokio::select!` for responsive cancellation.
610    ///
611    /// # Examples
612    ///
613    /// ## Background File Processing
614    /// ```ignore
615    /// let handle = program.spawn_cancellable(|token| async move {
616    ///     for file in large_file_list {
617    ///         if token.is_cancelled() {
618    ///             return Err("Cancelled");
619    ///         }
620    ///         process_file(file).await?;
621    ///     }
622    ///     Ok("All files processed")
623    /// });
624    /// ```
625    ///
626    /// ## Long Polling
627    /// ```ignore
628    /// let handle = program.spawn_cancellable(|token| async move {
629    ///     loop {
630    ///         tokio::select! {
631    ///             _ = token.cancelled() => {
632    ///                 break Ok("Polling cancelled");
633    ///             }
634    ///             result = poll_server() => {
635    ///                 handle_result(result)?;
636    ///             }
637    ///         }
638    ///     }
639    /// });
640    ///
641    /// // Cancel when user navigates away
642    /// if user_navigated_away {
643    ///     handle.cancel().await;
644    /// }
645    /// ```
646    pub fn spawn_cancellable<F, Fut, T>(&self, f: F) -> AsyncHandle<T>
647    where
648        F: FnOnce(tokio_util::sync::CancellationToken) -> Fut,
649        Fut: std::future::Future<Output = T> + Send + 'static,
650        T: Send + 'static,
651    {
652        use tokio_util::sync::CancellationToken;
653
654        let cancel_token = CancellationToken::new();
655        let token_clone = cancel_token.clone();
656
657        let handle = self.command_executor.spawn(f(token_clone));
658
659        AsyncHandle::new(handle, cancel_token)
660    }
661
662    /// Spawn a cancellable async operation that sends messages
663    ///
664    /// Similar to spawn_cancellable but specifically for operations that produce
665    /// messages for the program.
666    ///
667    /// # Example
668    /// ```ignore
669    /// let handle = program.spawn_cancellable_cmd(|token, sender| async move {
670    ///     while !token.is_cancelled() {
671    ///         let data = fetch_data().await;
672    ///         let _ = sender.send(Event::User(Msg::DataReceived(data)));
673    ///         tokio::time::sleep(Duration::from_secs(1)).await;
674    ///     }
675    /// });
676    /// ```
677    pub fn spawn_cancellable_cmd<F, Fut>(&mut self, f: F) -> AsyncHandle<()>
678    where
679        F: FnOnce(tokio_util::sync::CancellationToken, mpsc::SyncSender<Event<M::Message>>) -> Fut,
680        Fut: std::future::Future<Output = ()> + Send + 'static,
681        M::Message: Send + 'static,
682    {
683        use tokio_util::sync::CancellationToken;
684
685        // Ensure we have a message channel
686        if self.message_tx.is_none() {
687            self.init_async_bridge();
688        }
689
690        let sender = self
691            .message_tx
692            .as_ref()
693            .expect("message_tx should be Some after init_async_bridge")
694            .clone();
695        let cancel_token = CancellationToken::new();
696        let token_clone = cancel_token.clone();
697
698        let handle = self.command_executor.spawn(f(token_clone, sender));
699
700        AsyncHandle::new(handle, cancel_token)
701    }
702
703    /// Spawn a simple async task that produces a message
704    ///
705    /// This is a simplified version of spawn_cancellable for tasks that don't need
706    /// cancellation support. The task runs to completion and sends its result.
707    ///
708    /// # Example
709    /// ```ignore
710    /// program.spawn(async {
711    ///     let data = fetch_data().await;
712    ///     Some(Msg::DataLoaded(data))
713    /// });
714    /// ```
715    pub fn spawn<Fut>(&mut self, fut: Fut)
716    where
717        Fut: std::future::Future<Output = Option<M::Message>> + Send + 'static,
718        M::Message: Send + 'static,
719    {
720        // Ensure we have a message channel
721        if self.message_tx.is_none() {
722            self.init_async_bridge();
723        }
724
725        let sender = self
726            .message_tx
727            .as_ref()
728            .expect("message_tx should be Some after init_async_bridge")
729            .clone();
730
731        self.command_executor.spawn(async move {
732            if let Some(msg) = fut.await {
733                let _ = sender.send(Event::User(msg));
734            }
735        });
736    }
737
738    /// Subscribe to an interval timer with a simple callback
739    ///
740    /// This is a simplified API for the common case of periodic events.
741    ///
742    /// # Example
743    /// ```ignore
744    /// program.subscribe_interval(Duration::from_secs(1), || Msg::Tick);
745    /// ```
746    pub fn subscribe_interval<F>(&mut self, interval: Duration, mut callback: F) -> Subscription
747    where
748        F: FnMut() -> M::Message + Send + 'static,
749        M::Message: Send + 'static,
750    {
751        use futures::stream::StreamExt;
752        
753        let stream = async_stream::stream! {
754            let mut interval = tokio::time::interval(interval);
755            loop {
756                interval.tick().await;
757                yield callback();
758            }
759        };
760        
761        self.subscribe(stream)
762    }
763
764    /// Subscribe to a stream with automatic error handling
765    ///
766    /// This helper automatically converts stream items and errors to messages.
767    ///
768    /// # Example
769    /// ```ignore
770    /// program.subscribe_with_error(
771    ///     my_stream,
772    ///     |item| Msg::Data(item),
773    ///     |error| Msg::Error(error.to_string())
774    /// );
775    /// ```
776    pub fn subscribe_with_error<S, T, E, F, G>(
777        &mut self,
778        stream: S,
779        on_item: F,
780        on_error: G,
781    ) -> Subscription
782    where
783        S: futures::Stream<Item = std::result::Result<T, E>> + Send + 'static,
784        F: Fn(T) -> M::Message + Send + 'static,
785        G: Fn(E) -> M::Message + Send + 'static,
786        M::Message: Send + 'static,
787    {
788        use futures::StreamExt;
789        
790        let mapped_stream = stream.map(move |result| match result {
791            Ok(item) => on_item(item),
792            Err(error) => on_error(error),
793        });
794        
795        self.subscribe(mapped_stream)
796    }
797
798    /// Run the program with a timeout (useful for testing)
799    ///
800    /// The program will automatically exit after the specified duration.
801    /// Returns Ok(()) if the timeout was reached or the program exited normally.
802    pub fn run_with_timeout(self, timeout: Duration) -> Result<()> {
803        let start = Instant::now();
804        self.run_internal(Some(timeout), Some(start), None)
805    }
806
807    /// Run the program until a condition is met (useful for testing)
808    ///
809    /// The condition function is called after each update with the current model state.
810    /// The program exits when the condition returns true.
811    pub fn run_until<F>(self, condition: F) -> Result<()>
812    where
813        F: FnMut(&M) -> bool + 'static,
814    {
815        self.run_with_condition(Some(Box::new(condition)))
816    }
817
818    /// Run the program until it exits
819    pub fn run(self) -> Result<()> {
820        self.run_internal(None, None, None)
821    }
822
823    /// Run with a condition check
824    fn run_with_condition(self, condition: Option<ConditionCheck<M>>) -> Result<()> {
825        self.run_internal(None, None, condition)
826    }
827
828    /// Internal run implementation with optional timeout and condition
829    fn run_internal(
830        mut self,
831        timeout: Option<Duration>,
832        start_time: Option<Instant>,
833        mut condition: Option<ConditionCheck<M>>,
834    ) -> Result<()> {
835        // Mark as running
836        self.running.store(true, Ordering::SeqCst);
837
838        let (crossterm_tx, crossterm_rx) = mpsc::sync_channel(100);
839
840        // Use existing message channel if initialized, otherwise create new one
841        let (message_tx, message_rx) = if let Some(rx) = self.message_rx.take() {
842            // Channel was already initialized via init_async_bridge()
843            let tx = self
844                .message_tx
845                .as_ref()
846                .expect("message_tx should be Some when message_rx is Some")
847                .clone();
848            (tx, rx)
849        } else {
850            let (tx, rx) = mpsc::sync_channel(100);
851            self.message_tx = Some(tx.clone());
852            (tx, rx)
853        };
854
855        // Spawn input thread if not headless
856        if !self.options.headless && !self.options.without_renderer {
857            let running = Arc::clone(&self.running);
858            let force_quit = Arc::clone(&self.force_quit);
859
860            let input_thread = thread::spawn(move || loop {
861                if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
862                    break;
863                }
864
865                if event::poll(Duration::from_millis(100)).unwrap_or(false) {
866                    if let Ok(event) = event::read() {
867                        let _ = crossterm_tx.send(event);
868                    }
869                }
870            });
871            self.input_thread = Some(input_thread);
872        }
873
874        // Run initial command
875        let init_cmd = self.model.init();
876        if !init_cmd.is_noop() {
877            self.command_executor.execute(init_cmd, message_tx.clone());
878        }
879
880        // Main event loop
881        let tick_rate = Duration::from_millis(250);
882        loop {
883            if self.force_quit.load(Ordering::SeqCst) {
884                break;
885            }
886
887            // Check timeout if specified
888            if let (Some(timeout), Some(start)) = (timeout, start_time) {
889                if start.elapsed() >= timeout {
890                    break; // Timeout reached
891                }
892            }
893
894            // Process events with shorter timeout to allow checking conditions
895            let event_timeout = if timeout.is_some() {
896                Duration::from_millis(10) // Check more frequently when timeout is set
897            } else {
898                tick_rate
899            };
900            let event = if self.options.headless {
901                self.priority_processor
902                    .process_events_headless(&message_rx, event_timeout)
903            } else {
904                self.priority_processor
905                    .process_events(&message_rx, &crossterm_rx, event_timeout)
906            };
907
908            if let Some(event) = event {
909                // Check for quit
910                if matches!(event, Event::Quit) {
911                    break;
912                }
913
914                // Apply filter if present
915                let event = if let Some(ref filter) = self.filter {
916                    filter(&self.model, event)
917                } else {
918                    Some(event)
919                };
920
921                // Update model
922                if let Some(event) = event {
923                    let cmd = self.model.update(event);
924
925                    // Check if command is quit
926                    if cmd.is_quit() {
927                        break;
928                    }
929
930                    // Execute the command if it's not a no-op
931                    if !cmd.is_noop() {
932                        self.command_executor.execute(cmd, message_tx.clone());
933                    }
934                }
935            }
936
937            // Check condition if specified
938            if let Some(ref mut cond) = condition {
939                if cond(&self.model) {
940                    break; // Condition met
941                }
942            }
943
944            // Render if needed and FPS allows
945            if !self.options.without_renderer && self.fps_limiter.should_render() {
946                self.terminal_manager
947                    .draw(|f| {
948                        let area = f.area();
949                        self.model.view(f, area);
950                    })
951                    .map_err(Error::from)?;
952                self.fps_limiter.mark_rendered();
953            }
954        }
955
956        // Log final statistics before cleanup
957        log::info!(
958            "Program shutting down. Final stats: {}",
959            self.event_stats_string()
960        );
961
962        // Cleanup
963        self.running.store(false, Ordering::SeqCst);
964        self.terminal_manager.cleanup().map_err(Error::from)?;
965
966        Ok(())
967    }
968}
969
970impl<M: Model> Drop for Program<M> {
971    fn drop(&mut self) {
972        let _ = self.terminal_manager.cleanup();
973
974        // Stop the input thread
975        self.running.store(false, Ordering::SeqCst);
976        self.force_quit.store(true, Ordering::SeqCst);
977
978        if let Some(thread) = self.input_thread.take() {
979            let _ = thread.join();
980        }
981    }
982}
983
984#[cfg(test)]
985mod tests {
986    use super::*;
987
988    #[test]
989    fn test_program_options_all_methods() {
990        let options = ProgramOptions::default()
991            .with_mouse_mode(MouseMode::CellMotion)
992            .with_alt_screen(true)
993            .with_bracketed_paste(true)
994            .with_focus_reporting(true)
995            .with_fps(120)
996            .headless()
997            .without_signal_handler()
998            .without_renderer();
999
1000        assert_eq!(options.mouse_mode, MouseMode::CellMotion);
1001        assert!(options.alt_screen);
1002        assert!(options.bracketed_paste);
1003        assert!(options.focus_reporting);
1004        assert_eq!(options.fps, 120);
1005        assert!(options.headless);
1006        assert!(!options.install_signal_handler);
1007        assert!(options.without_renderer);
1008    }
1009
1010    #[test]
1011    fn test_mouse_mode_default() {
1012        assert_eq!(MouseMode::default(), MouseMode::None);
1013    }
1014
1015    #[test]
1016    fn test_program_drop() {
1017        use hojicha_core::core::Cmd;
1018
1019        struct TestModel;
1020        impl Model for TestModel {
1021            type Message = ();
1022            fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1023                Cmd::none()
1024            }
1025            fn view(&self, _: &mut ratatui::Frame, _: ratatui::layout::Rect) {}
1026        }
1027
1028        let options = ProgramOptions::default().headless();
1029        {
1030            let _program = Program::with_options(TestModel, options).unwrap();
1031            // Program should clean up when dropped
1032        }
1033    }
1034
1035    #[test]
1036    fn test_program_methods() {
1037        use hojicha_core::core::Cmd;
1038
1039        struct TestModel;
1040        impl Model for TestModel {
1041            type Message = String;
1042            fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1043                Cmd::none()
1044            }
1045            fn view(&self, _: &mut ratatui::Frame, _: ratatui::layout::Rect) {}
1046        }
1047
1048        let options = ProgramOptions::default().headless();
1049        let mut program = Program::with_options(TestModel, options).unwrap();
1050
1051        // Test println and printf
1052        program.println("test");
1053        program.printf(format_args!("test {}", 42));
1054
1055        // Test quit and kill
1056        program.quit();
1057        program.kill();
1058
1059        // Test release and restore terminal
1060        let _ = program.release_terminal();
1061        let _ = program.restore_terminal();
1062    }
1063
1064    #[test]
1065    fn test_program_with_filter() {
1066        use hojicha_core::core::Cmd;
1067
1068        struct TestModel;
1069        impl Model for TestModel {
1070            type Message = i32;
1071            fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1072                Cmd::none()
1073            }
1074            fn view(&self, _: &mut ratatui::Frame, _: ratatui::layout::Rect) {}
1075        }
1076
1077        let options = ProgramOptions::default().headless();
1078        let program = Program::with_options(TestModel, options).unwrap();
1079
1080        let _filtered = program.with_filter(|_, event| match event {
1081            Event::User(n) if n > 5 => None,
1082            _ => Some(event),
1083        });
1084    }
1085}