hojicha_runtime/
program.rs

1//! The main Program struct that runs the application
2//!
3//! ## Async Task Spawning Methods
4//!
5//! The Program struct provides several methods for spawning async tasks,
6//! each optimized for different use cases:
7//!
8//! ### `commands::spawn()` - Simple Async Task Spawning
9//! - **Purpose**: Simple fire-and-forget async operations
10//! - **Use cases**: HTTP requests, file I/O, simple background work
11//! - **Lifecycle**: Runs to completion, sends optional result message
12//! - **Cancellation**: No built-in cancellation support
13//! - **Example**: One-time data fetch, file save operation
14//!
15//! ### `Program::spawn()` - Runtime-Managed Task Spawning
16//! - **Purpose**: Tasks that need to communicate with the running program
17//! - **Use cases**: Background tasks that send multiple messages
18//! - **Lifecycle**: Managed by program runtime, automatic cleanup
19//! - **Cancellation**: No built-in cancellation support
20//! - **Example**: Background data processing that sends progress updates
21//!
22//! ### `Program::spawn_cancellable()` - Cancellable Task Handles
23//! - **Purpose**: Long-running tasks that may need to be cancelled
24//! - **Use cases**: User-cancellable operations, background monitors
25//! - **Lifecycle**: Returns AsyncHandle for manual cancellation
26//! - **Cancellation**: Cooperative cancellation via CancellationToken
27//! - **Example**: File upload/download, long-running computation
28//!
29//! ### `Program::spawn_cancellable_cmd()` - Cancellable Message-Sending Tasks
30//! - **Purpose**: Long-running tasks that send messages AND can be cancelled
31//! - **Use cases**: Real-time data streams, periodic background tasks
32//! - **Lifecycle**: Returns AsyncHandle, integrated with message passing
33//! - **Cancellation**: Cooperative cancellation via CancellationToken
34//! - **Example**: Live data feeds, periodic health checks
35//!
36//! ## Choosing the Right Method
37//!
38//! ```ignore
39//! // Simple one-shot operation
40//! let cmd = commands::spawn(async {
41//!     let data = fetch_data().await;
42//!     Some(Msg::DataLoaded(data))
43//! });
44//!
45//! // Background task with progress updates
46//! program.spawn(async {
47//!     process_large_file().await;
48//!     Some(Msg::ProcessingComplete)
49//! });
50//!
51//! // User-cancellable operation
52//! let handle = program.spawn_cancellable(|token| async move {
53//!     expensive_computation(token).await
54//! });
55//!
56//! // Real-time stream with cancellation
57//! let handle = program.spawn_cancellable_cmd(|token, sender| async move {
58//!     while !token.is_cancelled() {
59//!         let data = stream.next().await;
60//!         sender.send(Event::User(Msg::StreamData(data)));
61//!     }
62//! });
63//! ```
64
65// Module components
66mod command_executor;
67pub mod error_handler;
68mod event_processor;
69mod fps_limiter;
70mod priority_event_processor;
71mod terminal_manager;
72
73pub use command_executor::CommandExecutor;
74pub use event_processor::EventProcessor;
75pub use fps_limiter::FpsLimiter;
76pub use priority_event_processor::{
77    get_event_stats, EventStats, PriorityConfig, PriorityEventProcessor,
78};
79pub use terminal_manager::{TerminalConfig, TerminalManager};
80
81// Re-export the main types from program_old.rs for backward compatibility
82// We'll gradually migrate the implementation to use the extracted components
83
84use crate::async_handle::AsyncHandle;
85use crate::panic_recovery::{self, PanicRecoveryStrategy};
86use crate::resource_limits::ResourceLimits;
87use crate::string_renderer::StringRenderer;
88use crate::subscription::Subscription;
89use crossterm::event::{self};
90use hojicha_core::core::Model;
91use hojicha_core::error::{Error, Result};
92use hojicha_core::event::Event;
93use std::io::{self, Read, Write};
94use std::sync::atomic::{AtomicBool, Ordering};
95use std::sync::{mpsc, Arc};
96use std::thread;
97use std::time::{Duration, Instant};
98
99/// Type alias for message filter function
100type MessageFilter<M> = Box<
101    dyn Fn(&M, Event<<M as Model>::Message>) -> Option<Event<<M as Model>::Message>> + Send + Sync,
102>;
103
104/// Type alias for condition check function
105type ConditionCheck<M> = Box<dyn FnMut(&M) -> bool>;
106
107/// Mouse tracking mode
108#[derive(Debug, Clone, Copy, PartialEq, Default)]
109pub enum MouseMode {
110    /// No mouse tracking
111    #[default]
112    None,
113    /// Track mouse events only when buttons are pressed
114    CellMotion,
115    /// Track all mouse movement, even without button presses
116    AllMotion,
117}
118
119/// Options for configuring the program
120pub struct ProgramOptions {
121    /// Whether to use alternate screen
122    pub alt_screen: bool,
123    /// Mouse tracking mode
124    pub mouse_mode: MouseMode,
125    /// Enable bracketed paste mode
126    pub bracketed_paste: bool,
127    /// Enable focus reporting
128    pub focus_reporting: bool,
129    /// Frames per second (0 = unlimited)
130    pub fps: u16,
131    /// Run in headless mode without rendering
132    pub headless: bool,
133    /// Disable signal handlers
134    pub install_signal_handler: bool,
135    /// Disable renderer
136    pub without_renderer: bool,
137    /// Custom output writer
138    pub output: Option<Box<dyn Write + Send + Sync>>,
139    /// Custom input reader
140    pub input: Option<Box<dyn Read + Send + Sync>>,
141    /// Panic recovery strategy for Model methods
142    pub panic_recovery_strategy: PanicRecoveryStrategy,
143    /// Resource limits for async task execution
144    pub resource_limits: ResourceLimits,
145}
146
147impl ProgramOptions {
148    /// Create new default options
149    pub fn new() -> Self {
150        Self {
151            alt_screen: true,
152            mouse_mode: MouseMode::None,
153            bracketed_paste: false,
154            focus_reporting: false,
155            fps: 60,
156            headless: false,
157            install_signal_handler: true,
158            without_renderer: false,
159            output: None,
160            input: None,
161            panic_recovery_strategy: PanicRecoveryStrategy::default(),
162            resource_limits: ResourceLimits::default(),
163        }
164    }
165
166    /// Enable or disable alternate screen
167    pub fn with_alt_screen(mut self, enable: bool) -> Self {
168        self.alt_screen = enable;
169        self
170    }
171
172    /// Set mouse tracking mode
173    pub fn with_mouse_mode(mut self, mode: MouseMode) -> Self {
174        self.mouse_mode = mode;
175        self
176    }
177
178    /// Enable bracketed paste mode
179    pub fn with_bracketed_paste(mut self, enable: bool) -> Self {
180        self.bracketed_paste = enable;
181        self
182    }
183
184    /// Enable focus reporting
185    pub fn with_focus_reporting(mut self, enable: bool) -> Self {
186        self.focus_reporting = enable;
187        self
188    }
189
190    /// Set frames per second
191    pub fn with_fps(mut self, fps: u16) -> Self {
192        self.fps = fps;
193        self
194    }
195
196    /// Run in headless mode
197    pub fn headless(mut self) -> Self {
198        self.headless = true;
199        self
200    }
201
202    /// Disable signal handlers
203    pub fn without_signal_handler(mut self) -> Self {
204        self.install_signal_handler = false;
205        self
206    }
207
208    /// Disable renderer
209    pub fn without_renderer(mut self) -> Self {
210        self.without_renderer = true;
211        self
212    }
213
214    /// Set panic recovery strategy for Model methods
215    pub fn with_panic_recovery(mut self, strategy: PanicRecoveryStrategy) -> Self {
216        self.panic_recovery_strategy = strategy;
217        self
218    }
219
220    /// Set custom output
221    pub fn with_output(mut self, output: Box<dyn Write + Send + Sync>) -> Self {
222        self.output = Some(output);
223        self
224    }
225
226    /// Set custom input source
227    pub fn with_input(mut self, input: Box<dyn Read + Send + Sync>) -> Self {
228        self.input = Some(input);
229        self
230    }
231
232    /// Set input from a string (convenience method for testing)
233    pub fn with_input_string(mut self, input: &str) -> Self {
234        use std::io::Cursor;
235        self.input = Some(Box::new(Cursor::new(input.as_bytes().to_vec())));
236        self
237    }
238
239    /// Set resource limits for async task execution
240    pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
241        self.resource_limits = limits;
242        self
243    }
244}
245
246impl Default for ProgramOptions {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252/// The main program that runs your application
253pub struct Program<M: Model> {
254    model: M,
255    options: ProgramOptions,
256    terminal_manager: TerminalManager,
257    command_executor: CommandExecutor<M::Message>,
258    fps_limiter: FpsLimiter,
259    message_tx: Option<mpsc::SyncSender<Event<M::Message>>>,
260    message_rx: Option<mpsc::Receiver<Event<M::Message>>>,
261    priority_processor: PriorityEventProcessor<M::Message>,
262    filter: Option<MessageFilter<M>>,
263    running: Arc<AtomicBool>,
264    force_quit: Arc<AtomicBool>,
265    input_thread: Option<thread::JoinHandle<()>>,
266    // String-based renderer
267    string_renderer: StringRenderer,
268}
269
270impl<M: Model> Program<M>
271where
272    M::Message: Clone,
273{
274    /// Create a new program with the given model
275    pub fn new(model: M) -> Result<Self> {
276        Self::with_options(model, ProgramOptions::default())
277    }
278
279    /// Create a new program with custom options
280    pub fn with_options(model: M, options: ProgramOptions) -> Result<Self> {
281        // Create terminal manager
282        let terminal_config = TerminalConfig {
283            alt_screen: options.alt_screen,
284            mouse_mode: options.mouse_mode,
285            bracketed_paste: options.bracketed_paste,
286            focus_reporting: options.focus_reporting,
287            headless: options.headless || options.without_renderer,
288        };
289        let terminal_manager = TerminalManager::new(terminal_config)?;
290
291        // Create command executor with resource limits
292        let command_executor =
293            CommandExecutor::with_resource_limits(options.resource_limits.clone())?;
294
295        // Create FPS limiter
296        let fps_limiter = FpsLimiter::new(options.fps);
297
298        // Create priority event processor with default config
299        let priority_processor = PriorityEventProcessor::new();
300
301        log::info!("Hojicha program initialized with priority event processing");
302
303        Ok(Self {
304            model,
305            options,
306            terminal_manager,
307            command_executor,
308            fps_limiter,
309            message_tx: None,
310            message_rx: None,
311            priority_processor,
312            filter: None,
313            running: Arc::new(AtomicBool::new(false)),
314            force_quit: Arc::new(AtomicBool::new(false)),
315            input_thread: None,
316            string_renderer: StringRenderer::new(),
317        })
318    }
319
320    /// Set a message filter function
321    pub fn with_filter<F>(mut self, filter: F) -> Self
322    where
323        F: Fn(&M, Event<M::Message>) -> Option<Event<M::Message>> + Send + Sync + 'static,
324    {
325        self.filter = Some(Box::new(filter));
326        self
327    }
328
329    /// Configure the priority event processor
330    pub fn with_priority_config(mut self, config: PriorityConfig) -> Self {
331        self.priority_processor = PriorityEventProcessor::with_config(config);
332        log::debug!("Priority processor configured with custom settings");
333        self
334    }
335
336    /// Get current event processing statistics
337    pub fn event_stats(&self) -> EventStats {
338        self.priority_processor.stats()
339    }
340
341    /// Get a formatted string of event statistics (useful for debugging)
342    pub fn event_stats_string(&self) -> String {
343        get_event_stats(&self.priority_processor)
344    }
345
346    /// Print formatted text to stderr
347    pub fn printf(&self, args: std::fmt::Arguments) {
348        eprint!("{args}");
349        let _ = io::stderr().flush();
350    }
351
352    /// Print a line to stderr
353    pub fn println(&self, text: &str) {
354        eprintln!("{text}");
355        let _ = io::stderr().flush();
356    }
357
358    /// Send a quit message to the running program
359    pub fn quit(&self) {
360        if let Some(tx) = &self.message_tx {
361            let _ = tx.send(Event::Quit);
362        }
363    }
364
365    /// Force kill the program immediately
366    ///
367    /// This method forcefully terminates the program without any cleanup.
368    /// Unlike `quit()` which sends a quit message through the event loop,
369    /// `kill()` immediately stops execution.
370    ///
371    /// # Use Cases
372    /// - Emergency shutdown when the program is unresponsive
373    /// - Testing scenarios requiring immediate termination
374    /// - Signal handlers that need to stop the program immediately
375    ///
376    /// # Example
377    /// ```ignore
378    /// // In a signal handler or emergency shutdown
379    /// program.kill();
380    /// ```
381    ///
382    /// # Note
383    /// This bypasses normal cleanup procedures. Prefer `quit()` for graceful shutdown.
384    pub fn kill(&self) {
385        self.force_quit.store(true, Ordering::SeqCst);
386        self.running.store(false, Ordering::SeqCst);
387    }
388
389    /// Get advanced performance metrics
390    pub fn metrics(&self) -> crate::metrics::AdvancedEventStats {
391        self.priority_processor.advanced_metrics()
392    }
393
394    /// Get resource usage statistics
395    pub fn resource_stats(&self) -> crate::resource_limits::ResourceStats {
396        self.command_executor.resource_stats()
397    }
398
399    /// Export metrics in JSON format
400    pub fn metrics_json(&self) -> String {
401        self.priority_processor.metrics_collector().export_json()
402    }
403
404    /// Export metrics in Prometheus format
405    pub fn metrics_prometheus(&self) -> String {
406        self.priority_processor
407            .metrics_collector()
408            .export_prometheus()
409    }
410
411    /// Export metrics in plain text format
412    pub fn metrics_text(&self) -> String {
413        self.priority_processor.metrics_collector().export_text()
414    }
415
416    /// Display metrics dashboard (for debugging)
417    pub fn metrics_dashboard(&self) -> String {
418        let stats = self.priority_processor.advanced_metrics();
419        crate::metrics::display_dashboard(&stats)
420    }
421
422    /// Reset all metrics
423    pub fn reset_metrics(&self) {
424        self.priority_processor.reset_stats();
425    }
426
427    /// Dynamically resize the event queue
428    pub fn resize_queue(&self, new_size: usize) -> Result<()> {
429        self.priority_processor
430            .resize_queue(new_size)
431            .map_err(|e| Error::Custom(Box::new(e)))
432    }
433
434    /// Get current queue capacity
435    pub fn queue_capacity(&self) -> usize {
436        self.priority_processor.queue_capacity()
437    }
438
439    /// Enable auto-scaling with the specified configuration
440    pub fn enable_auto_scaling(
441        &mut self,
442        config: crate::queue_scaling::AutoScaleConfig,
443    ) -> &mut Self {
444        self.priority_processor.enable_auto_scaling(config);
445        self
446    }
447
448    /// Enable auto-scaling with default configuration
449    pub fn with_auto_scaling(mut self) -> Self {
450        self.priority_processor
451            .enable_auto_scaling(crate::queue_scaling::AutoScaleConfig::default());
452        self
453    }
454
455    /// Disable auto-scaling
456    pub fn disable_auto_scaling(&mut self) -> &mut Self {
457        self.priority_processor.disable_auto_scaling();
458        self
459    }
460
461    /// Get a sender for injecting events from external sources
462    ///
463    /// This allows external async tasks to send messages to the program's event loop.
464    /// The sender is thread-safe and can be cloned for use in multiple threads.
465    ///
466    /// # Example
467    /// ```ignore
468    /// let mut program = Program::new(model)?;
469    /// let sender = program.sender();
470    ///
471    /// std::thread::spawn(move || {
472    ///     loop {
473    ///         std::thread::sleep(Duration::from_secs(1));
474    ///         let _ = sender.send(Event::User(Msg::Tick));
475    ///     }
476    /// });
477    ///
478    /// program.run()?;
479    /// ```
480    pub fn sender(&self) -> Option<mpsc::SyncSender<Event<M::Message>>> {
481        self.message_tx.clone()
482    }
483
484    /// Send a user message to the program
485    ///
486    /// Convenience method that wraps the message in Event::User.
487    ///
488    /// # Example
489    /// ```ignore
490    /// program.send_message(Msg::DataLoaded(data))?;
491    /// ```
492    pub fn send_message(&self, msg: M::Message) -> Result<()> {
493        self.message_tx
494            .as_ref()
495            .ok_or_else(|| {
496                Error::from(io::Error::new(
497                    io::ErrorKind::NotConnected,
498                    "Program not running",
499                ))
500            })?
501            .send(Event::User(msg))
502            .map_err(|_| {
503                Error::from(io::Error::new(
504                    io::ErrorKind::BrokenPipe,
505                    "Receiver disconnected",
506                ))
507            })
508    }
509
510    /// Wait for the program to finish
511    pub fn wait(&self) {
512        while !self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
513            thread::sleep(Duration::from_millis(1));
514        }
515        while self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
516            thread::sleep(Duration::from_millis(10));
517        }
518    }
519
520    /// Release the terminal
521    pub fn release_terminal(&mut self) -> Result<()> {
522        self.terminal_manager.release().map_err(Error::from)
523    }
524
525    /// Restore the terminal
526    pub fn restore_terminal(&mut self) -> Result<()> {
527        self.terminal_manager.restore().map_err(Error::from)
528    }
529
530    /// Initialize the async message bridge for external event injection
531    ///
532    /// This sets up channels for external systems to send messages into the program's
533    /// event loop. Must be called before `run()` if you need external message injection.
534    ///
535    /// # Use Cases
536    ///
537    /// - **WebSocket Integration**: Forward messages from WebSocket connections
538    /// - **File Watchers**: Send events when files change
539    /// - **Timers**: Implement custom scheduling beyond tick/every
540    /// - **Database Listeners**: Forward change notifications
541    /// - **IPC**: Receive messages from other processes
542    /// - **HTTP Responses**: Send results from async HTTP requests
543    ///
544    /// # Thread Safety
545    ///
546    /// The returned sender can be cloned and shared across multiple threads safely.
547    /// Messages are queued with a capacity of 100 by default.
548    ///
549    /// # Example
550    ///
551    /// ```no_run
552    /// use hojicha_runtime::{Program, Event};
553    /// use hojicha_core::{Model, Cmd};
554    ///
555    /// #[derive(Clone, Debug)]
556    /// struct MyModel;
557    ///
558    /// #[derive(Clone, Debug)]  
559    /// enum MyMsg { Hello }
560    ///
561    /// impl Model for MyModel {
562    ///     type Message = MyMsg;
563    ///     fn init(&mut self) -> Cmd<Self::Message> { Cmd::noop() }
564    ///     fn update(&mut self, _msg: Event<Self::Message>) -> Cmd<Self::Message> { Cmd::noop() }
565    ///     fn view(&self) -> String { "test".to_string() }
566    /// }
567    ///
568    /// let mut program = Program::new(MyModel).unwrap();
569    /// let sender = program.init_async_bridge();
570    ///
571    /// // Send an external event from another thread
572    /// sender.send(Event::User(MyMsg::Hello)).ok();
573    /// ```
574    pub fn init_async_bridge(&mut self) -> mpsc::SyncSender<Event<M::Message>> {
575        if self.message_tx.is_none() {
576            let (message_tx, message_rx) = mpsc::sync_channel(100);
577            self.message_tx = Some(message_tx.clone());
578            self.message_rx = Some(message_rx);
579            message_tx
580        } else {
581            // Safe to unwrap here because we just checked is_none() above
582            self.message_tx
583                .as_ref()
584                .expect("message_tx should be Some after init check")
585                .clone()
586        }
587    }
588
589    /// Subscribe to an async stream of events
590    ///
591    /// Connects any `futures::Stream` to your program's event loop. Each item from
592    /// the stream is automatically wrapped in `Event::User` and sent to your model's
593    /// update function.
594    ///
595    /// # Use Cases
596    ///
597    /// - **WebSocket/SSE**: Stream real-time messages
598    /// - **File Watching**: Monitor file system changes
599    /// - **Periodic Tasks**: Custom intervals and scheduling
600    /// - **Database Changes**: Listen to change streams
601    /// - **Sensor Data**: Process continuous data streams
602    ///
603    /// # Cancellation
604    ///
605    /// The returned `Subscription` handle allows graceful cancellation. The subscription
606    /// is also automatically cancelled when dropped.
607    ///
608    /// # Examples
609    ///
610    /// ## Timer Stream
611    /// ```ignore
612    /// use tokio_stream::wrappers::IntervalStream;
613    /// use std::time::Duration;
614    ///
615    /// let interval = tokio::time::interval(Duration::from_secs(1));
616    /// let stream = IntervalStream::new(interval)
617    ///     .map(|_| Msg::Tick);
618    /// let subscription = program.subscribe(stream);
619    /// ```
620    ///
621    /// ## WebSocket Stream
622    /// ```ignore
623    /// let ws_stream = websocket.messages()
624    ///     .filter_map(|msg| msg.ok())
625    ///     .map(|msg| Msg::WebSocketMessage(msg));
626    /// let subscription = program.subscribe(ws_stream);
627    /// ```
628    ///
629    /// ## File Watcher Stream
630    /// ```ignore
631    /// let file_stream = watch_file("/path/to/file")
632    ///     .map(|event| Msg::FileChanged(event));
633    /// let subscription = program.subscribe(file_stream);
634    /// ```
635    pub fn subscribe<S>(&mut self, stream: S) -> Subscription
636    where
637        S: futures::Stream<Item = M::Message> + Send + 'static,
638        M::Message: Send + 'static,
639    {
640        use futures::StreamExt;
641        use tokio_util::sync::CancellationToken;
642
643        // Ensure we have a message channel
644        if self.message_tx.is_none() {
645            self.init_async_bridge();
646        }
647
648        let sender = self
649            .message_tx
650            .as_ref()
651            .expect("message_tx should be Some after init_async_bridge")
652            .clone();
653        let cancel_token = CancellationToken::new();
654        let cancel_clone = cancel_token.clone();
655
656        // Spawn task to poll the stream
657        let handle = self.command_executor.spawn(async move {
658            tokio::pin!(stream);
659
660            loop {
661                tokio::select! {
662                    _ = cancel_clone.cancelled() => {
663                        break; // Subscription was cancelled
664                    }
665                    item = stream.next() => {
666                        match item {
667                            Some(msg) => {
668                                if sender.send(Event::User(msg)).is_err() {
669                                    break; // Program has shut down
670                                }
671                            }
672                            None => {
673                                break; // Stream completed
674                            }
675                        }
676                    }
677                }
678            }
679        });
680
681        Subscription::new(handle, cancel_token)
682    }
683
684    /// Spawn a cancellable async operation
685    ///
686    /// Spawns a long-running async task with cooperative cancellation support.
687    /// The task receives a `CancellationToken` for checking cancellation status.
688    ///
689    /// # Use Cases
690    ///
691    /// - **Background Processing**: Data analysis, file processing
692    /// - **Network Operations**: Long polling, streaming downloads
693    /// - **Periodic Tasks**: Health checks, metrics collection
694    /// - **Resource Monitoring**: CPU/memory monitoring
695    /// - **Cleanup Tasks**: Temporary file cleanup, cache management
696    ///
697    /// # Cancellation Pattern
698    ///
699    /// Tasks should periodically check the cancellation token and exit gracefully
700    /// when cancelled. Use `tokio::select!` for responsive cancellation.
701    ///
702    /// # Examples
703    ///
704    /// ## Background File Processing
705    /// ```ignore
706    /// let handle = program.spawn_cancellable(|token| async move {
707    ///     for file in large_file_list {
708    ///         if token.is_cancelled() {
709    ///             return Err("Cancelled");
710    ///         }
711    ///         process_file(file).await?;
712    ///     }
713    ///     Ok("All files processed")
714    /// });
715    /// ```
716    ///
717    /// ## Long Polling
718    /// ```ignore
719    /// let handle = program.spawn_cancellable(|token| async move {
720    ///     loop {
721    ///         tokio::select! {
722    ///             _ = token.cancelled() => {
723    ///                 break Ok("Polling cancelled");
724    ///             }
725    ///             result = poll_server() => {
726    ///                 handle_result(result)?;
727    ///             }
728    ///         }
729    ///     }
730    /// });
731    ///
732    /// // Cancel when user navigates away
733    /// if user_navigated_away {
734    ///     handle.cancel().await;
735    /// }
736    /// ```
737    pub fn spawn_cancellable<F, Fut, T>(&self, f: F) -> AsyncHandle<T>
738    where
739        F: FnOnce(tokio_util::sync::CancellationToken) -> Fut,
740        Fut: std::future::Future<Output = T> + Send + 'static,
741        T: Send + 'static,
742    {
743        use tokio_util::sync::CancellationToken;
744
745        let cancel_token = CancellationToken::new();
746        let token_clone = cancel_token.clone();
747
748        let handle = self.command_executor.spawn(f(token_clone));
749
750        AsyncHandle::new(handle, cancel_token)
751    }
752
753    /// Spawn a cancellable async operation that sends messages
754    ///
755    /// Similar to spawn_cancellable but specifically for operations that produce
756    /// messages for the program.
757    ///
758    /// # Example
759    /// ```ignore
760    /// let handle = program.spawn_cancellable_cmd(|token, sender| async move {
761    ///     while !token.is_cancelled() {
762    ///         let data = fetch_data().await;
763    ///         let _ = sender.send(Event::User(Msg::DataReceived(data)));
764    ///         tokio::time::sleep(Duration::from_secs(1)).await;
765    ///     }
766    /// });
767    /// ```
768    pub fn spawn_cancellable_cmd<F, Fut>(&mut self, f: F) -> AsyncHandle<()>
769    where
770        F: FnOnce(tokio_util::sync::CancellationToken, mpsc::SyncSender<Event<M::Message>>) -> Fut,
771        Fut: std::future::Future<Output = ()> + Send + 'static,
772        M::Message: Send + 'static,
773    {
774        use tokio_util::sync::CancellationToken;
775
776        // Ensure we have a message channel
777        if self.message_tx.is_none() {
778            self.init_async_bridge();
779        }
780
781        let sender = self
782            .message_tx
783            .as_ref()
784            .expect("message_tx should be Some after init_async_bridge")
785            .clone();
786        let cancel_token = CancellationToken::new();
787        let token_clone = cancel_token.clone();
788
789        let handle = self.command_executor.spawn(f(token_clone, sender));
790
791        AsyncHandle::new(handle, cancel_token)
792    }
793
794    /// Spawn a simple async task that produces a message
795    ///
796    /// This is a simplified version of spawn_cancellable for tasks that don't need
797    /// cancellation support. The task runs to completion and sends its result.
798    ///
799    /// # Example
800    /// ```ignore
801    /// program.spawn(async {
802    ///     let data = fetch_data().await;
803    ///     Some(Msg::DataLoaded(data))
804    /// });
805    /// ```
806    pub fn spawn<Fut>(&mut self, fut: Fut)
807    where
808        Fut: std::future::Future<Output = Option<M::Message>> + Send + 'static,
809        M::Message: Send + 'static,
810    {
811        // Ensure we have a message channel
812        if self.message_tx.is_none() {
813            self.init_async_bridge();
814        }
815
816        let sender = self
817            .message_tx
818            .as_ref()
819            .expect("message_tx should be Some after init_async_bridge")
820            .clone();
821
822        self.command_executor.spawn(async move {
823            if let Some(msg) = fut.await {
824                let _ = sender.send(Event::User(msg));
825            }
826        });
827    }
828
829    /// Subscribe to an interval timer with a simple callback
830    ///
831    /// This is a simplified API for the common case of periodic events.
832    ///
833    /// # Example
834    /// ```ignore
835    /// program.subscribe_interval(Duration::from_secs(1), || Msg::Tick);
836    /// ```
837    pub fn subscribe_interval<F>(&mut self, interval: Duration, mut callback: F) -> Subscription
838    where
839        F: FnMut() -> M::Message + Send + 'static,
840        M::Message: Send + 'static,
841    {
842        let stream = async_stream::stream! {
843            let mut interval = tokio::time::interval(interval);
844            loop {
845                interval.tick().await;
846                yield callback();
847            }
848        };
849
850        self.subscribe(stream)
851    }
852
853    /// Subscribe to a stream with automatic error handling
854    ///
855    /// This helper automatically converts stream items and errors to messages.
856    ///
857    /// # Example
858    /// ```ignore
859    /// program.subscribe_with_error(
860    ///     my_stream,
861    ///     |item| Msg::Data(item),
862    ///     |error| Msg::Error(error.to_string())
863    /// );
864    /// ```
865    pub fn subscribe_with_error<S, T, E, F, G>(
866        &mut self,
867        stream: S,
868        on_item: F,
869        on_error: G,
870    ) -> Subscription
871    where
872        S: futures::Stream<Item = std::result::Result<T, E>> + Send + 'static,
873        F: Fn(T) -> M::Message + Send + 'static,
874        G: Fn(E) -> M::Message + Send + 'static,
875        M::Message: Send + 'static,
876    {
877        use futures::StreamExt;
878
879        let mapped_stream = stream.map(move |result| match result {
880            Ok(item) => on_item(item),
881            Err(error) => on_error(error),
882        });
883
884        self.subscribe(mapped_stream)
885    }
886
887    /// Run the program with a timeout (useful for testing)
888    ///
889    /// The program will automatically exit after the specified duration.
890    /// Returns Ok(()) if the timeout was reached or the program exited normally.
891    pub fn run_with_timeout(self, timeout: Duration) -> Result<()> {
892        let start = Instant::now();
893        self.run_internal(Some(timeout), Some(start), None)
894    }
895
896    /// Run the program until a condition is met (useful for testing)
897    ///
898    /// The condition function is called after each update with the current model state.
899    /// The program exits when the condition returns true.
900    pub fn run_until<F>(self, condition: F) -> Result<()>
901    where
902        F: FnMut(&M) -> bool + 'static,
903    {
904        self.run_with_condition(Some(Box::new(condition)))
905    }
906
907    /// Run the program until it exits
908    pub fn run(self) -> Result<()> {
909        self.run_internal(None, None, None)
910    }
911
912    /// Run with a condition check
913    fn run_with_condition(self, condition: Option<ConditionCheck<M>>) -> Result<()> {
914        self.run_internal(None, None, condition)
915    }
916
917    /// Internal run implementation with optional timeout and condition
918    fn run_internal(
919        mut self,
920        timeout: Option<Duration>,
921        start_time: Option<Instant>,
922        mut condition: Option<ConditionCheck<M>>,
923    ) -> Result<()> {
924        // Mark as running
925        self.running.store(true, Ordering::SeqCst);
926
927        let (crossterm_tx, crossterm_rx) = mpsc::sync_channel(100);
928
929        // Use existing message channel if initialized, otherwise create new one
930        let (message_tx, message_rx) = if let Some(rx) = self.message_rx.take() {
931            // Channel was already initialized via init_async_bridge()
932            let tx = self
933                .message_tx
934                .as_ref()
935                .expect("message_tx should be Some when message_rx is Some")
936                .clone();
937            (tx, rx)
938        } else {
939            let (tx, rx) = mpsc::sync_channel(100);
940            self.message_tx = Some(tx.clone());
941            (tx, rx)
942        };
943
944        // Spawn input thread if not headless
945        if !self.options.headless && !self.options.without_renderer {
946            let running = Arc::clone(&self.running);
947            let force_quit = Arc::clone(&self.force_quit);
948
949            let input_thread = thread::spawn(move || loop {
950                if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
951                    break;
952                }
953
954                if event::poll(Duration::from_millis(100)).unwrap_or(false) {
955                    if let Ok(event) = event::read() {
956                        let _ = crossterm_tx.send(event);
957                    }
958                }
959            });
960            self.input_thread = Some(input_thread);
961        }
962
963        // Run initial command with panic recovery
964        let init_cmd =
965            panic_recovery::safe_init(&mut self.model, self.options.panic_recovery_strategy);
966        if init_cmd.is_quit() {
967            // If init returns quit due to panic recovery, exit early
968            self.running.store(false, Ordering::SeqCst);
969            self.terminal_manager.cleanup().map_err(Error::from)?;
970            return Ok(());
971        }
972        if !init_cmd.is_noop() {
973            self.command_executor.execute(init_cmd, &message_tx);
974        }
975
976        // Main event loop
977        let tick_rate = Duration::from_millis(250);
978        loop {
979            if self.force_quit.load(Ordering::SeqCst) {
980                break;
981            }
982
983            // Check timeout if specified
984            if let (Some(timeout), Some(start)) = (timeout, start_time) {
985                if start.elapsed() >= timeout {
986                    break; // Timeout reached
987                }
988            }
989
990            // Process events with shorter timeout to allow checking conditions
991            let event_timeout = if timeout.is_some() {
992                Duration::from_millis(10) // Check more frequently when timeout is set
993            } else {
994                tick_rate
995            };
996            let event = if self.options.headless {
997                self.priority_processor
998                    .process_events_headless(&message_rx, event_timeout)
999            } else {
1000                self.priority_processor
1001                    .process_events(&message_rx, &crossterm_rx, event_timeout)
1002            };
1003
1004            if let Some(event) = event {
1005                // Check for quit
1006                if matches!(event, Event::Quit) {
1007                    break;
1008                }
1009
1010                // Apply filter if present
1011                let event = if let Some(ref filter) = self.filter {
1012                    filter(&self.model, event)
1013                } else {
1014                    Some(event)
1015                };
1016
1017                // Update model with panic recovery
1018                if let Some(event) = event {
1019                    let cmd = panic_recovery::safe_update(
1020                        &mut self.model,
1021                        event,
1022                        self.options.panic_recovery_strategy,
1023                    );
1024
1025                    // Check if command is quit
1026                    if cmd.is_quit() {
1027                        break;
1028                    }
1029
1030                    // Execute the command if it's not a no-op
1031                    if !cmd.is_noop() {
1032                        self.command_executor.execute(cmd, &message_tx);
1033                    }
1034                }
1035            }
1036
1037            // Check condition if specified
1038            if let Some(ref mut cond) = condition {
1039                if cond(&self.model) {
1040                    break; // Condition met
1041                }
1042            }
1043
1044            // Render if needed and FPS allows
1045            if !self.options.without_renderer && self.fps_limiter.should_render() {
1046                // Get the view string from the model with panic recovery
1047                let (view_string, should_quit) =
1048                    panic_recovery::safe_view(&self.model, self.options.panic_recovery_strategy);
1049
1050                if should_quit {
1051                    // View panic requested quit
1052                    break;
1053                }
1054
1055                // Render the string to the terminal
1056                self.string_renderer
1057                    .render(view_string)
1058                    .map_err(Error::from)?;
1059
1060                self.fps_limiter.mark_rendered();
1061            }
1062        }
1063
1064        // Log final statistics before cleanup
1065        log::info!(
1066            "Program shutting down. Final stats: {}",
1067            self.event_stats_string()
1068        );
1069
1070        // Cleanup
1071        self.running.store(false, Ordering::SeqCst);
1072        self.terminal_manager.cleanup().map_err(Error::from)?;
1073
1074        Ok(())
1075    }
1076}
1077
1078impl<M: Model> Drop for Program<M> {
1079    fn drop(&mut self) {
1080        let _ = self.terminal_manager.cleanup();
1081
1082        // Stop the input thread
1083        self.running.store(false, Ordering::SeqCst);
1084        self.force_quit.store(true, Ordering::SeqCst);
1085
1086        if let Some(thread) = self.input_thread.take() {
1087            let _ = thread.join();
1088        }
1089    }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094    use super::*;
1095
1096    #[test]
1097    fn test_program_options_all_methods() {
1098        let options = ProgramOptions::default()
1099            .with_mouse_mode(MouseMode::CellMotion)
1100            .with_alt_screen(true)
1101            .with_bracketed_paste(true)
1102            .with_focus_reporting(true)
1103            .with_fps(120)
1104            .headless()
1105            .without_signal_handler()
1106            .without_renderer();
1107
1108        assert_eq!(options.mouse_mode, MouseMode::CellMotion);
1109        assert!(options.alt_screen);
1110        assert!(options.bracketed_paste);
1111        assert!(options.focus_reporting);
1112        assert_eq!(options.fps, 120);
1113        assert!(options.headless);
1114        assert!(!options.install_signal_handler);
1115        assert!(options.without_renderer);
1116    }
1117
1118    #[test]
1119    fn test_mouse_mode_default() {
1120        assert_eq!(MouseMode::default(), MouseMode::None);
1121    }
1122
1123    #[test]
1124    fn test_program_drop() {
1125        use hojicha_core::core::Cmd;
1126
1127        struct TestModel;
1128        impl Model for TestModel {
1129            type Message = ();
1130            fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1131                Cmd::noop()
1132            }
1133            fn view(&self) -> String {
1134                "TestModel".to_string()
1135            }
1136        }
1137
1138        let options = ProgramOptions::default().headless();
1139        {
1140            let _program = Program::with_options(TestModel, options).unwrap();
1141            // Program should clean up when dropped
1142        }
1143    }
1144
1145    #[test]
1146    fn test_program_methods() {
1147        use hojicha_core::core::Cmd;
1148
1149        struct TestModel;
1150        impl Model for TestModel {
1151            type Message = String;
1152            fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1153                Cmd::noop()
1154            }
1155            fn view(&self) -> String {
1156                "TestModel".to_string()
1157            }
1158        }
1159
1160        let options = ProgramOptions::default().headless();
1161        let mut program = Program::with_options(TestModel, options).unwrap();
1162
1163        // Test println and printf
1164        program.println("test");
1165        program.printf(format_args!("test {}", 42));
1166
1167        // Test quit and kill
1168        program.quit();
1169        program.kill();
1170
1171        // Test release and restore terminal
1172        let _ = program.release_terminal();
1173        let _ = program.restore_terminal();
1174    }
1175
1176    #[test]
1177    fn test_program_with_filter() {
1178        use hojicha_core::core::Cmd;
1179
1180        struct TestModel;
1181        impl Model for TestModel {
1182            type Message = i32;
1183            fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1184                Cmd::noop()
1185            }
1186            fn view(&self) -> String {
1187                "TestModel".to_string()
1188            }
1189        }
1190
1191        let options = ProgramOptions::default().headless();
1192        let program = Program::with_options(TestModel, options).unwrap();
1193
1194        let _filtered = program.with_filter(|_, event| match event {
1195            Event::User(n) if n > 5 => None,
1196            _ => Some(event),
1197        });
1198    }
1199}