hojicha_runtime/program.rs
1//! The main Program struct that runs the application
2//!
3//! ## Async Task Spawning Methods
4//!
5//! The Program struct provides several methods for spawning async tasks,
6//! each optimized for different use cases:
7//!
8//! ### `commands::spawn()` - Simple Async Task Spawning
9//! - **Purpose**: Simple fire-and-forget async operations
10//! - **Use cases**: HTTP requests, file I/O, simple background work
11//! - **Lifecycle**: Runs to completion, sends optional result message
12//! - **Cancellation**: No built-in cancellation support
13//! - **Example**: One-time data fetch, file save operation
14//!
15//! ### `Program::spawn()` - Runtime-Managed Task Spawning
16//! - **Purpose**: Tasks that need to communicate with the running program
17//! - **Use cases**: Background tasks that send multiple messages
18//! - **Lifecycle**: Managed by program runtime, automatic cleanup
19//! - **Cancellation**: No built-in cancellation support
20//! - **Example**: Background data processing that sends progress updates
21//!
22//! ### `Program::spawn_cancellable()` - Cancellable Task Handles
23//! - **Purpose**: Long-running tasks that may need to be cancelled
24//! - **Use cases**: User-cancellable operations, background monitors
25//! - **Lifecycle**: Returns AsyncHandle for manual cancellation
26//! - **Cancellation**: Cooperative cancellation via CancellationToken
27//! - **Example**: File upload/download, long-running computation
28//!
29//! ### `Program::spawn_cancellable_cmd()` - Cancellable Message-Sending Tasks
30//! - **Purpose**: Long-running tasks that send messages AND can be cancelled
31//! - **Use cases**: Real-time data streams, periodic background tasks
32//! - **Lifecycle**: Returns AsyncHandle, integrated with message passing
33//! - **Cancellation**: Cooperative cancellation via CancellationToken
34//! - **Example**: Live data feeds, periodic health checks
35//!
36//! ## Choosing the Right Method
37//!
38//! ```ignore
39//! // Simple one-shot operation
40//! let cmd = commands::spawn(async {
41//! let data = fetch_data().await;
42//! Some(Msg::DataLoaded(data))
43//! });
44//!
45//! // Background task with progress updates
46//! program.spawn(async {
47//! process_large_file().await;
48//! Some(Msg::ProcessingComplete)
49//! });
50//!
51//! // User-cancellable operation
52//! let handle = program.spawn_cancellable(|token| async move {
53//! expensive_computation(token).await
54//! });
55//!
56//! // Real-time stream with cancellation
57//! let handle = program.spawn_cancellable_cmd(|token, sender| async move {
58//! while !token.is_cancelled() {
59//! let data = stream.next().await;
60//! sender.send(Event::User(Msg::StreamData(data)));
61//! }
62//! });
63//! ```
64
65// Module components
66mod command_executor;
67pub mod error_handler;
68mod event_processor;
69mod fps_limiter;
70mod priority_event_processor;
71mod terminal_manager;
72
73pub use command_executor::CommandExecutor;
74pub use event_processor::EventProcessor;
75pub use fps_limiter::FpsLimiter;
76pub use priority_event_processor::{
77 get_event_stats, EventStats, PriorityConfig, PriorityEventProcessor,
78};
79pub use terminal_manager::{TerminalConfig, TerminalManager};
80
81// Re-export the main types from program_old.rs for backward compatibility
82// We'll gradually migrate the implementation to use the extracted components
83
84use crate::async_handle::AsyncHandle;
85use crate::panic_recovery::{self, PanicRecoveryStrategy};
86use crate::resource_limits::ResourceLimits;
87use crate::string_renderer::StringRenderer;
88use crate::subscription::Subscription;
89use crossterm::event::{self};
90use hojicha_core::core::Model;
91use hojicha_core::error::{Error, Result};
92use hojicha_core::event::Event;
93use std::io::{self, Read, Write};
94use std::sync::atomic::{AtomicBool, Ordering};
95use std::sync::{mpsc, Arc};
96use std::thread;
97use std::time::{Duration, Instant};
98
99/// Type alias for message filter function
100type MessageFilter<M> = Box<
101 dyn Fn(&M, Event<<M as Model>::Message>) -> Option<Event<<M as Model>::Message>> + Send + Sync,
102>;
103
104/// Type alias for condition check function
105type ConditionCheck<M> = Box<dyn FnMut(&M) -> bool>;
106
107/// Mouse tracking mode
108#[derive(Debug, Clone, Copy, PartialEq, Default)]
109pub enum MouseMode {
110 /// No mouse tracking
111 #[default]
112 None,
113 /// Track mouse events only when buttons are pressed
114 CellMotion,
115 /// Track all mouse movement, even without button presses
116 AllMotion,
117}
118
119/// Options for configuring the program
120pub struct ProgramOptions {
121 /// Whether to use alternate screen
122 pub alt_screen: bool,
123 /// Mouse tracking mode
124 pub mouse_mode: MouseMode,
125 /// Enable bracketed paste mode
126 pub bracketed_paste: bool,
127 /// Enable focus reporting
128 pub focus_reporting: bool,
129 /// Frames per second (0 = unlimited)
130 pub fps: u16,
131 /// Run in headless mode without rendering
132 pub headless: bool,
133 /// Disable signal handlers
134 pub install_signal_handler: bool,
135 /// Disable renderer
136 pub without_renderer: bool,
137 /// Custom output writer
138 pub output: Option<Box<dyn Write + Send + Sync>>,
139 /// Custom input reader
140 pub input: Option<Box<dyn Read + Send + Sync>>,
141 /// Panic recovery strategy for Model methods
142 pub panic_recovery_strategy: PanicRecoveryStrategy,
143 /// Resource limits for async task execution
144 pub resource_limits: ResourceLimits,
145}
146
147impl ProgramOptions {
148 /// Create new default options
149 pub fn new() -> Self {
150 Self {
151 alt_screen: true,
152 mouse_mode: MouseMode::None,
153 bracketed_paste: false,
154 focus_reporting: false,
155 fps: 60,
156 headless: false,
157 install_signal_handler: true,
158 without_renderer: false,
159 output: None,
160 input: None,
161 panic_recovery_strategy: PanicRecoveryStrategy::default(),
162 resource_limits: ResourceLimits::default(),
163 }
164 }
165
166 /// Enable or disable alternate screen
167 pub fn with_alt_screen(mut self, enable: bool) -> Self {
168 self.alt_screen = enable;
169 self
170 }
171
172 /// Set mouse tracking mode
173 pub fn with_mouse_mode(mut self, mode: MouseMode) -> Self {
174 self.mouse_mode = mode;
175 self
176 }
177
178 /// Enable bracketed paste mode
179 pub fn with_bracketed_paste(mut self, enable: bool) -> Self {
180 self.bracketed_paste = enable;
181 self
182 }
183
184 /// Enable focus reporting
185 pub fn with_focus_reporting(mut self, enable: bool) -> Self {
186 self.focus_reporting = enable;
187 self
188 }
189
190 /// Set frames per second
191 pub fn with_fps(mut self, fps: u16) -> Self {
192 self.fps = fps;
193 self
194 }
195
196 /// Run in headless mode
197 pub fn headless(mut self) -> Self {
198 self.headless = true;
199 self
200 }
201
202 /// Disable signal handlers
203 pub fn without_signal_handler(mut self) -> Self {
204 self.install_signal_handler = false;
205 self
206 }
207
208 /// Disable renderer
209 pub fn without_renderer(mut self) -> Self {
210 self.without_renderer = true;
211 self
212 }
213
214 /// Set panic recovery strategy for Model methods
215 pub fn with_panic_recovery(mut self, strategy: PanicRecoveryStrategy) -> Self {
216 self.panic_recovery_strategy = strategy;
217 self
218 }
219
220 /// Set custom output
221 pub fn with_output(mut self, output: Box<dyn Write + Send + Sync>) -> Self {
222 self.output = Some(output);
223 self
224 }
225
226 /// Set custom input source
227 pub fn with_input(mut self, input: Box<dyn Read + Send + Sync>) -> Self {
228 self.input = Some(input);
229 self
230 }
231
232 /// Set input from a string (convenience method for testing)
233 pub fn with_input_string(mut self, input: &str) -> Self {
234 use std::io::Cursor;
235 self.input = Some(Box::new(Cursor::new(input.as_bytes().to_vec())));
236 self
237 }
238
239 /// Set resource limits for async task execution
240 pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
241 self.resource_limits = limits;
242 self
243 }
244}
245
246impl Default for ProgramOptions {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252/// The main program that runs your application
253pub struct Program<M: Model> {
254 model: M,
255 options: ProgramOptions,
256 terminal_manager: TerminalManager,
257 command_executor: CommandExecutor<M::Message>,
258 fps_limiter: FpsLimiter,
259 message_tx: Option<mpsc::SyncSender<Event<M::Message>>>,
260 message_rx: Option<mpsc::Receiver<Event<M::Message>>>,
261 priority_processor: PriorityEventProcessor<M::Message>,
262 filter: Option<MessageFilter<M>>,
263 running: Arc<AtomicBool>,
264 force_quit: Arc<AtomicBool>,
265 input_thread: Option<thread::JoinHandle<()>>,
266 // String-based renderer
267 string_renderer: StringRenderer,
268}
269
270impl<M: Model> Program<M>
271where
272 M::Message: Clone,
273{
274 /// Create a new program with the given model
275 pub fn new(model: M) -> Result<Self> {
276 Self::with_options(model, ProgramOptions::default())
277 }
278
279 /// Create a new program with custom options
280 pub fn with_options(model: M, options: ProgramOptions) -> Result<Self> {
281 // Create terminal manager
282 let terminal_config = TerminalConfig {
283 alt_screen: options.alt_screen,
284 mouse_mode: options.mouse_mode,
285 bracketed_paste: options.bracketed_paste,
286 focus_reporting: options.focus_reporting,
287 headless: options.headless || options.without_renderer,
288 };
289 let terminal_manager = TerminalManager::new(terminal_config)?;
290
291 // Create command executor with resource limits
292 let command_executor =
293 CommandExecutor::with_resource_limits(options.resource_limits.clone())?;
294
295 // Create FPS limiter
296 let fps_limiter = FpsLimiter::new(options.fps);
297
298 // Create priority event processor with default config
299 let priority_processor = PriorityEventProcessor::new();
300
301 log::info!("Hojicha program initialized with priority event processing");
302
303 Ok(Self {
304 model,
305 options,
306 terminal_manager,
307 command_executor,
308 fps_limiter,
309 message_tx: None,
310 message_rx: None,
311 priority_processor,
312 filter: None,
313 running: Arc::new(AtomicBool::new(false)),
314 force_quit: Arc::new(AtomicBool::new(false)),
315 input_thread: None,
316 string_renderer: StringRenderer::new(),
317 })
318 }
319
320 /// Set a message filter function
321 pub fn with_filter<F>(mut self, filter: F) -> Self
322 where
323 F: Fn(&M, Event<M::Message>) -> Option<Event<M::Message>> + Send + Sync + 'static,
324 {
325 self.filter = Some(Box::new(filter));
326 self
327 }
328
329 /// Configure the priority event processor
330 pub fn with_priority_config(mut self, config: PriorityConfig) -> Self {
331 self.priority_processor = PriorityEventProcessor::with_config(config);
332 log::debug!("Priority processor configured with custom settings");
333 self
334 }
335
336 /// Get current event processing statistics
337 pub fn event_stats(&self) -> EventStats {
338 self.priority_processor.stats()
339 }
340
341 /// Get a formatted string of event statistics (useful for debugging)
342 pub fn event_stats_string(&self) -> String {
343 get_event_stats(&self.priority_processor)
344 }
345
346 /// Print formatted text to stderr
347 pub fn printf(&self, args: std::fmt::Arguments) {
348 eprint!("{args}");
349 let _ = io::stderr().flush();
350 }
351
352 /// Print a line to stderr
353 pub fn println(&self, text: &str) {
354 eprintln!("{text}");
355 let _ = io::stderr().flush();
356 }
357
358 /// Send a quit message to the running program
359 pub fn quit(&self) {
360 if let Some(tx) = &self.message_tx {
361 let _ = tx.send(Event::Quit);
362 }
363 }
364
365 /// Force kill the program immediately
366 ///
367 /// This method forcefully terminates the program without any cleanup.
368 /// Unlike `quit()` which sends a quit message through the event loop,
369 /// `kill()` immediately stops execution.
370 ///
371 /// # Use Cases
372 /// - Emergency shutdown when the program is unresponsive
373 /// - Testing scenarios requiring immediate termination
374 /// - Signal handlers that need to stop the program immediately
375 ///
376 /// # Example
377 /// ```ignore
378 /// // In a signal handler or emergency shutdown
379 /// program.kill();
380 /// ```
381 ///
382 /// # Note
383 /// This bypasses normal cleanup procedures. Prefer `quit()` for graceful shutdown.
384 pub fn kill(&self) {
385 self.force_quit.store(true, Ordering::SeqCst);
386 self.running.store(false, Ordering::SeqCst);
387 }
388
389 /// Get advanced performance metrics
390 pub fn metrics(&self) -> crate::metrics::AdvancedEventStats {
391 self.priority_processor.advanced_metrics()
392 }
393
394 /// Get resource usage statistics
395 pub fn resource_stats(&self) -> crate::resource_limits::ResourceStats {
396 self.command_executor.resource_stats()
397 }
398
399 /// Export metrics in JSON format
400 pub fn metrics_json(&self) -> String {
401 self.priority_processor.metrics_collector().export_json()
402 }
403
404 /// Export metrics in Prometheus format
405 pub fn metrics_prometheus(&self) -> String {
406 self.priority_processor
407 .metrics_collector()
408 .export_prometheus()
409 }
410
411 /// Export metrics in plain text format
412 pub fn metrics_text(&self) -> String {
413 self.priority_processor.metrics_collector().export_text()
414 }
415
416 /// Display metrics dashboard (for debugging)
417 pub fn metrics_dashboard(&self) -> String {
418 let stats = self.priority_processor.advanced_metrics();
419 crate::metrics::display_dashboard(&stats)
420 }
421
422 /// Reset all metrics
423 pub fn reset_metrics(&self) {
424 self.priority_processor.reset_stats();
425 }
426
427 /// Dynamically resize the event queue
428 pub fn resize_queue(&self, new_size: usize) -> Result<()> {
429 self.priority_processor
430 .resize_queue(new_size)
431 .map_err(|e| Error::Custom(Box::new(e)))
432 }
433
434 /// Get current queue capacity
435 pub fn queue_capacity(&self) -> usize {
436 self.priority_processor.queue_capacity()
437 }
438
439 /// Enable auto-scaling with the specified configuration
440 pub fn enable_auto_scaling(
441 &mut self,
442 config: crate::queue_scaling::AutoScaleConfig,
443 ) -> &mut Self {
444 self.priority_processor.enable_auto_scaling(config);
445 self
446 }
447
448 /// Enable auto-scaling with default configuration
449 pub fn with_auto_scaling(mut self) -> Self {
450 self.priority_processor
451 .enable_auto_scaling(crate::queue_scaling::AutoScaleConfig::default());
452 self
453 }
454
455 /// Disable auto-scaling
456 pub fn disable_auto_scaling(&mut self) -> &mut Self {
457 self.priority_processor.disable_auto_scaling();
458 self
459 }
460
461 /// Get a sender for injecting events from external sources
462 ///
463 /// This allows external async tasks to send messages to the program's event loop.
464 /// The sender is thread-safe and can be cloned for use in multiple threads.
465 ///
466 /// # Example
467 /// ```ignore
468 /// let mut program = Program::new(model)?;
469 /// let sender = program.sender();
470 ///
471 /// std::thread::spawn(move || {
472 /// loop {
473 /// std::thread::sleep(Duration::from_secs(1));
474 /// let _ = sender.send(Event::User(Msg::Tick));
475 /// }
476 /// });
477 ///
478 /// program.run()?;
479 /// ```
480 pub fn sender(&self) -> Option<mpsc::SyncSender<Event<M::Message>>> {
481 self.message_tx.clone()
482 }
483
484 /// Send a user message to the program
485 ///
486 /// Convenience method that wraps the message in Event::User.
487 ///
488 /// # Example
489 /// ```ignore
490 /// program.send_message(Msg::DataLoaded(data))?;
491 /// ```
492 pub fn send_message(&self, msg: M::Message) -> Result<()> {
493 self.message_tx
494 .as_ref()
495 .ok_or_else(|| {
496 Error::from(io::Error::new(
497 io::ErrorKind::NotConnected,
498 "Program not running",
499 ))
500 })?
501 .send(Event::User(msg))
502 .map_err(|_| {
503 Error::from(io::Error::new(
504 io::ErrorKind::BrokenPipe,
505 "Receiver disconnected",
506 ))
507 })
508 }
509
510 /// Wait for the program to finish
511 pub fn wait(&self) {
512 while !self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
513 thread::sleep(Duration::from_millis(1));
514 }
515 while self.running.load(Ordering::SeqCst) && !self.force_quit.load(Ordering::SeqCst) {
516 thread::sleep(Duration::from_millis(10));
517 }
518 }
519
520 /// Release the terminal
521 pub fn release_terminal(&mut self) -> Result<()> {
522 self.terminal_manager.release().map_err(Error::from)
523 }
524
525 /// Restore the terminal
526 pub fn restore_terminal(&mut self) -> Result<()> {
527 self.terminal_manager.restore().map_err(Error::from)
528 }
529
530 /// Initialize the async message bridge for external event injection
531 ///
532 /// This sets up channels for external systems to send messages into the program's
533 /// event loop. Must be called before `run()` if you need external message injection.
534 ///
535 /// # Use Cases
536 ///
537 /// - **WebSocket Integration**: Forward messages from WebSocket connections
538 /// - **File Watchers**: Send events when files change
539 /// - **Timers**: Implement custom scheduling beyond tick/every
540 /// - **Database Listeners**: Forward change notifications
541 /// - **IPC**: Receive messages from other processes
542 /// - **HTTP Responses**: Send results from async HTTP requests
543 ///
544 /// # Thread Safety
545 ///
546 /// The returned sender can be cloned and shared across multiple threads safely.
547 /// Messages are queued with a capacity of 100 by default.
548 ///
549 /// # Example
550 ///
551 /// ```no_run
552 /// use hojicha_runtime::{Program, Event};
553 /// use hojicha_core::{Model, Cmd};
554 ///
555 /// #[derive(Clone, Debug)]
556 /// struct MyModel;
557 ///
558 /// #[derive(Clone, Debug)]
559 /// enum MyMsg { Hello }
560 ///
561 /// impl Model for MyModel {
562 /// type Message = MyMsg;
563 /// fn init(&mut self) -> Cmd<Self::Message> { Cmd::noop() }
564 /// fn update(&mut self, _msg: Event<Self::Message>) -> Cmd<Self::Message> { Cmd::noop() }
565 /// fn view(&self) -> String { "test".to_string() }
566 /// }
567 ///
568 /// let mut program = Program::new(MyModel).unwrap();
569 /// let sender = program.init_async_bridge();
570 ///
571 /// // Send an external event from another thread
572 /// sender.send(Event::User(MyMsg::Hello)).ok();
573 /// ```
574 pub fn init_async_bridge(&mut self) -> mpsc::SyncSender<Event<M::Message>> {
575 if self.message_tx.is_none() {
576 let (message_tx, message_rx) = mpsc::sync_channel(100);
577 self.message_tx = Some(message_tx.clone());
578 self.message_rx = Some(message_rx);
579 message_tx
580 } else {
581 // Safe to unwrap here because we just checked is_none() above
582 self.message_tx
583 .as_ref()
584 .expect("message_tx should be Some after init check")
585 .clone()
586 }
587 }
588
589 /// Subscribe to an async stream of events
590 ///
591 /// Connects any `futures::Stream` to your program's event loop. Each item from
592 /// the stream is automatically wrapped in `Event::User` and sent to your model's
593 /// update function.
594 ///
595 /// # Use Cases
596 ///
597 /// - **WebSocket/SSE**: Stream real-time messages
598 /// - **File Watching**: Monitor file system changes
599 /// - **Periodic Tasks**: Custom intervals and scheduling
600 /// - **Database Changes**: Listen to change streams
601 /// - **Sensor Data**: Process continuous data streams
602 ///
603 /// # Cancellation
604 ///
605 /// The returned `Subscription` handle allows graceful cancellation. The subscription
606 /// is also automatically cancelled when dropped.
607 ///
608 /// # Examples
609 ///
610 /// ## Timer Stream
611 /// ```ignore
612 /// use tokio_stream::wrappers::IntervalStream;
613 /// use std::time::Duration;
614 ///
615 /// let interval = tokio::time::interval(Duration::from_secs(1));
616 /// let stream = IntervalStream::new(interval)
617 /// .map(|_| Msg::Tick);
618 /// let subscription = program.subscribe(stream);
619 /// ```
620 ///
621 /// ## WebSocket Stream
622 /// ```ignore
623 /// let ws_stream = websocket.messages()
624 /// .filter_map(|msg| msg.ok())
625 /// .map(|msg| Msg::WebSocketMessage(msg));
626 /// let subscription = program.subscribe(ws_stream);
627 /// ```
628 ///
629 /// ## File Watcher Stream
630 /// ```ignore
631 /// let file_stream = watch_file("/path/to/file")
632 /// .map(|event| Msg::FileChanged(event));
633 /// let subscription = program.subscribe(file_stream);
634 /// ```
635 pub fn subscribe<S>(&mut self, stream: S) -> Subscription
636 where
637 S: futures::Stream<Item = M::Message> + Send + 'static,
638 M::Message: Send + 'static,
639 {
640 use futures::StreamExt;
641 use tokio_util::sync::CancellationToken;
642
643 // Ensure we have a message channel
644 if self.message_tx.is_none() {
645 self.init_async_bridge();
646 }
647
648 let sender = self
649 .message_tx
650 .as_ref()
651 .expect("message_tx should be Some after init_async_bridge")
652 .clone();
653 let cancel_token = CancellationToken::new();
654 let cancel_clone = cancel_token.clone();
655
656 // Spawn task to poll the stream
657 let handle = self.command_executor.spawn(async move {
658 tokio::pin!(stream);
659
660 loop {
661 tokio::select! {
662 _ = cancel_clone.cancelled() => {
663 break; // Subscription was cancelled
664 }
665 item = stream.next() => {
666 match item {
667 Some(msg) => {
668 if sender.send(Event::User(msg)).is_err() {
669 break; // Program has shut down
670 }
671 }
672 None => {
673 break; // Stream completed
674 }
675 }
676 }
677 }
678 }
679 });
680
681 Subscription::new(handle, cancel_token)
682 }
683
684 /// Spawn a cancellable async operation
685 ///
686 /// Spawns a long-running async task with cooperative cancellation support.
687 /// The task receives a `CancellationToken` for checking cancellation status.
688 ///
689 /// # Use Cases
690 ///
691 /// - **Background Processing**: Data analysis, file processing
692 /// - **Network Operations**: Long polling, streaming downloads
693 /// - **Periodic Tasks**: Health checks, metrics collection
694 /// - **Resource Monitoring**: CPU/memory monitoring
695 /// - **Cleanup Tasks**: Temporary file cleanup, cache management
696 ///
697 /// # Cancellation Pattern
698 ///
699 /// Tasks should periodically check the cancellation token and exit gracefully
700 /// when cancelled. Use `tokio::select!` for responsive cancellation.
701 ///
702 /// # Examples
703 ///
704 /// ## Background File Processing
705 /// ```ignore
706 /// let handle = program.spawn_cancellable(|token| async move {
707 /// for file in large_file_list {
708 /// if token.is_cancelled() {
709 /// return Err("Cancelled");
710 /// }
711 /// process_file(file).await?;
712 /// }
713 /// Ok("All files processed")
714 /// });
715 /// ```
716 ///
717 /// ## Long Polling
718 /// ```ignore
719 /// let handle = program.spawn_cancellable(|token| async move {
720 /// loop {
721 /// tokio::select! {
722 /// _ = token.cancelled() => {
723 /// break Ok("Polling cancelled");
724 /// }
725 /// result = poll_server() => {
726 /// handle_result(result)?;
727 /// }
728 /// }
729 /// }
730 /// });
731 ///
732 /// // Cancel when user navigates away
733 /// if user_navigated_away {
734 /// handle.cancel().await;
735 /// }
736 /// ```
737 pub fn spawn_cancellable<F, Fut, T>(&self, f: F) -> AsyncHandle<T>
738 where
739 F: FnOnce(tokio_util::sync::CancellationToken) -> Fut,
740 Fut: std::future::Future<Output = T> + Send + 'static,
741 T: Send + 'static,
742 {
743 use tokio_util::sync::CancellationToken;
744
745 let cancel_token = CancellationToken::new();
746 let token_clone = cancel_token.clone();
747
748 let handle = self.command_executor.spawn(f(token_clone));
749
750 AsyncHandle::new(handle, cancel_token)
751 }
752
753 /// Spawn a cancellable async operation that sends messages
754 ///
755 /// Similar to spawn_cancellable but specifically for operations that produce
756 /// messages for the program.
757 ///
758 /// # Example
759 /// ```ignore
760 /// let handle = program.spawn_cancellable_cmd(|token, sender| async move {
761 /// while !token.is_cancelled() {
762 /// let data = fetch_data().await;
763 /// let _ = sender.send(Event::User(Msg::DataReceived(data)));
764 /// tokio::time::sleep(Duration::from_secs(1)).await;
765 /// }
766 /// });
767 /// ```
768 pub fn spawn_cancellable_cmd<F, Fut>(&mut self, f: F) -> AsyncHandle<()>
769 where
770 F: FnOnce(tokio_util::sync::CancellationToken, mpsc::SyncSender<Event<M::Message>>) -> Fut,
771 Fut: std::future::Future<Output = ()> + Send + 'static,
772 M::Message: Send + 'static,
773 {
774 use tokio_util::sync::CancellationToken;
775
776 // Ensure we have a message channel
777 if self.message_tx.is_none() {
778 self.init_async_bridge();
779 }
780
781 let sender = self
782 .message_tx
783 .as_ref()
784 .expect("message_tx should be Some after init_async_bridge")
785 .clone();
786 let cancel_token = CancellationToken::new();
787 let token_clone = cancel_token.clone();
788
789 let handle = self.command_executor.spawn(f(token_clone, sender));
790
791 AsyncHandle::new(handle, cancel_token)
792 }
793
794 /// Spawn a simple async task that produces a message
795 ///
796 /// This is a simplified version of spawn_cancellable for tasks that don't need
797 /// cancellation support. The task runs to completion and sends its result.
798 ///
799 /// # Example
800 /// ```ignore
801 /// program.spawn(async {
802 /// let data = fetch_data().await;
803 /// Some(Msg::DataLoaded(data))
804 /// });
805 /// ```
806 pub fn spawn<Fut>(&mut self, fut: Fut)
807 where
808 Fut: std::future::Future<Output = Option<M::Message>> + Send + 'static,
809 M::Message: Send + 'static,
810 {
811 // Ensure we have a message channel
812 if self.message_tx.is_none() {
813 self.init_async_bridge();
814 }
815
816 let sender = self
817 .message_tx
818 .as_ref()
819 .expect("message_tx should be Some after init_async_bridge")
820 .clone();
821
822 self.command_executor.spawn(async move {
823 if let Some(msg) = fut.await {
824 let _ = sender.send(Event::User(msg));
825 }
826 });
827 }
828
829 /// Subscribe to an interval timer with a simple callback
830 ///
831 /// This is a simplified API for the common case of periodic events.
832 ///
833 /// # Example
834 /// ```ignore
835 /// program.subscribe_interval(Duration::from_secs(1), || Msg::Tick);
836 /// ```
837 pub fn subscribe_interval<F>(&mut self, interval: Duration, mut callback: F) -> Subscription
838 where
839 F: FnMut() -> M::Message + Send + 'static,
840 M::Message: Send + 'static,
841 {
842 let stream = async_stream::stream! {
843 let mut interval = tokio::time::interval(interval);
844 loop {
845 interval.tick().await;
846 yield callback();
847 }
848 };
849
850 self.subscribe(stream)
851 }
852
853 /// Subscribe to a stream with automatic error handling
854 ///
855 /// This helper automatically converts stream items and errors to messages.
856 ///
857 /// # Example
858 /// ```ignore
859 /// program.subscribe_with_error(
860 /// my_stream,
861 /// |item| Msg::Data(item),
862 /// |error| Msg::Error(error.to_string())
863 /// );
864 /// ```
865 pub fn subscribe_with_error<S, T, E, F, G>(
866 &mut self,
867 stream: S,
868 on_item: F,
869 on_error: G,
870 ) -> Subscription
871 where
872 S: futures::Stream<Item = std::result::Result<T, E>> + Send + 'static,
873 F: Fn(T) -> M::Message + Send + 'static,
874 G: Fn(E) -> M::Message + Send + 'static,
875 M::Message: Send + 'static,
876 {
877 use futures::StreamExt;
878
879 let mapped_stream = stream.map(move |result| match result {
880 Ok(item) => on_item(item),
881 Err(error) => on_error(error),
882 });
883
884 self.subscribe(mapped_stream)
885 }
886
887 /// Run the program with a timeout (useful for testing)
888 ///
889 /// The program will automatically exit after the specified duration.
890 /// Returns Ok(()) if the timeout was reached or the program exited normally.
891 pub fn run_with_timeout(self, timeout: Duration) -> Result<()> {
892 let start = Instant::now();
893 self.run_internal(Some(timeout), Some(start), None)
894 }
895
896 /// Run the program until a condition is met (useful for testing)
897 ///
898 /// The condition function is called after each update with the current model state.
899 /// The program exits when the condition returns true.
900 pub fn run_until<F>(self, condition: F) -> Result<()>
901 where
902 F: FnMut(&M) -> bool + 'static,
903 {
904 self.run_with_condition(Some(Box::new(condition)))
905 }
906
907 /// Run the program until it exits
908 pub fn run(self) -> Result<()> {
909 self.run_internal(None, None, None)
910 }
911
912 /// Run with a condition check
913 fn run_with_condition(self, condition: Option<ConditionCheck<M>>) -> Result<()> {
914 self.run_internal(None, None, condition)
915 }
916
917 /// Internal run implementation with optional timeout and condition
918 fn run_internal(
919 mut self,
920 timeout: Option<Duration>,
921 start_time: Option<Instant>,
922 mut condition: Option<ConditionCheck<M>>,
923 ) -> Result<()> {
924 // Mark as running
925 self.running.store(true, Ordering::SeqCst);
926
927 let (crossterm_tx, crossterm_rx) = mpsc::sync_channel(100);
928
929 // Use existing message channel if initialized, otherwise create new one
930 let (message_tx, message_rx) = if let Some(rx) = self.message_rx.take() {
931 // Channel was already initialized via init_async_bridge()
932 let tx = self
933 .message_tx
934 .as_ref()
935 .expect("message_tx should be Some when message_rx is Some")
936 .clone();
937 (tx, rx)
938 } else {
939 let (tx, rx) = mpsc::sync_channel(100);
940 self.message_tx = Some(tx.clone());
941 (tx, rx)
942 };
943
944 // Spawn input thread if not headless
945 if !self.options.headless && !self.options.without_renderer {
946 let running = Arc::clone(&self.running);
947 let force_quit = Arc::clone(&self.force_quit);
948
949 let input_thread = thread::spawn(move || loop {
950 if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
951 break;
952 }
953
954 if event::poll(Duration::from_millis(100)).unwrap_or(false) {
955 if let Ok(event) = event::read() {
956 let _ = crossterm_tx.send(event);
957 }
958 }
959 });
960 self.input_thread = Some(input_thread);
961 }
962
963 // Run initial command with panic recovery
964 let init_cmd =
965 panic_recovery::safe_init(&mut self.model, self.options.panic_recovery_strategy);
966 if init_cmd.is_quit() {
967 // If init returns quit due to panic recovery, exit early
968 self.running.store(false, Ordering::SeqCst);
969 self.terminal_manager.cleanup().map_err(Error::from)?;
970 return Ok(());
971 }
972 if !init_cmd.is_noop() {
973 self.command_executor.execute(init_cmd, &message_tx);
974 }
975
976 // Main event loop
977 let tick_rate = Duration::from_millis(250);
978 loop {
979 if self.force_quit.load(Ordering::SeqCst) {
980 break;
981 }
982
983 // Check timeout if specified
984 if let (Some(timeout), Some(start)) = (timeout, start_time) {
985 if start.elapsed() >= timeout {
986 break; // Timeout reached
987 }
988 }
989
990 // Process events with shorter timeout to allow checking conditions
991 let event_timeout = if timeout.is_some() {
992 Duration::from_millis(10) // Check more frequently when timeout is set
993 } else {
994 tick_rate
995 };
996 let event = if self.options.headless {
997 self.priority_processor
998 .process_events_headless(&message_rx, event_timeout)
999 } else {
1000 self.priority_processor
1001 .process_events(&message_rx, &crossterm_rx, event_timeout)
1002 };
1003
1004 if let Some(event) = event {
1005 // Check for quit
1006 if matches!(event, Event::Quit) {
1007 break;
1008 }
1009
1010 // Apply filter if present
1011 let event = if let Some(ref filter) = self.filter {
1012 filter(&self.model, event)
1013 } else {
1014 Some(event)
1015 };
1016
1017 // Update model with panic recovery
1018 if let Some(event) = event {
1019 let cmd = panic_recovery::safe_update(
1020 &mut self.model,
1021 event,
1022 self.options.panic_recovery_strategy,
1023 );
1024
1025 // Check if command is quit
1026 if cmd.is_quit() {
1027 break;
1028 }
1029
1030 // Execute the command if it's not a no-op
1031 if !cmd.is_noop() {
1032 self.command_executor.execute(cmd, &message_tx);
1033 }
1034 }
1035 }
1036
1037 // Check condition if specified
1038 if let Some(ref mut cond) = condition {
1039 if cond(&self.model) {
1040 break; // Condition met
1041 }
1042 }
1043
1044 // Render if needed and FPS allows
1045 if !self.options.without_renderer && self.fps_limiter.should_render() {
1046 // Get the view string from the model with panic recovery
1047 let (view_string, should_quit) =
1048 panic_recovery::safe_view(&self.model, self.options.panic_recovery_strategy);
1049
1050 if should_quit {
1051 // View panic requested quit
1052 break;
1053 }
1054
1055 // Render the string to the terminal
1056 self.string_renderer
1057 .render(view_string)
1058 .map_err(Error::from)?;
1059
1060 self.fps_limiter.mark_rendered();
1061 }
1062 }
1063
1064 // Log final statistics before cleanup
1065 log::info!(
1066 "Program shutting down. Final stats: {}",
1067 self.event_stats_string()
1068 );
1069
1070 // Cleanup
1071 self.running.store(false, Ordering::SeqCst);
1072 self.terminal_manager.cleanup().map_err(Error::from)?;
1073
1074 Ok(())
1075 }
1076}
1077
1078impl<M: Model> Drop for Program<M> {
1079 fn drop(&mut self) {
1080 let _ = self.terminal_manager.cleanup();
1081
1082 // Stop the input thread
1083 self.running.store(false, Ordering::SeqCst);
1084 self.force_quit.store(true, Ordering::SeqCst);
1085
1086 if let Some(thread) = self.input_thread.take() {
1087 let _ = thread.join();
1088 }
1089 }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094 use super::*;
1095
1096 #[test]
1097 fn test_program_options_all_methods() {
1098 let options = ProgramOptions::default()
1099 .with_mouse_mode(MouseMode::CellMotion)
1100 .with_alt_screen(true)
1101 .with_bracketed_paste(true)
1102 .with_focus_reporting(true)
1103 .with_fps(120)
1104 .headless()
1105 .without_signal_handler()
1106 .without_renderer();
1107
1108 assert_eq!(options.mouse_mode, MouseMode::CellMotion);
1109 assert!(options.alt_screen);
1110 assert!(options.bracketed_paste);
1111 assert!(options.focus_reporting);
1112 assert_eq!(options.fps, 120);
1113 assert!(options.headless);
1114 assert!(!options.install_signal_handler);
1115 assert!(options.without_renderer);
1116 }
1117
1118 #[test]
1119 fn test_mouse_mode_default() {
1120 assert_eq!(MouseMode::default(), MouseMode::None);
1121 }
1122
1123 #[test]
1124 fn test_program_drop() {
1125 use hojicha_core::core::Cmd;
1126
1127 struct TestModel;
1128 impl Model for TestModel {
1129 type Message = ();
1130 fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1131 Cmd::noop()
1132 }
1133 fn view(&self) -> String {
1134 "TestModel".to_string()
1135 }
1136 }
1137
1138 let options = ProgramOptions::default().headless();
1139 {
1140 let _program = Program::with_options(TestModel, options).unwrap();
1141 // Program should clean up when dropped
1142 }
1143 }
1144
1145 #[test]
1146 fn test_program_methods() {
1147 use hojicha_core::core::Cmd;
1148
1149 struct TestModel;
1150 impl Model for TestModel {
1151 type Message = String;
1152 fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1153 Cmd::noop()
1154 }
1155 fn view(&self) -> String {
1156 "TestModel".to_string()
1157 }
1158 }
1159
1160 let options = ProgramOptions::default().headless();
1161 let mut program = Program::with_options(TestModel, options).unwrap();
1162
1163 // Test println and printf
1164 program.println("test");
1165 program.printf(format_args!("test {}", 42));
1166
1167 // Test quit and kill
1168 program.quit();
1169 program.kill();
1170
1171 // Test release and restore terminal
1172 let _ = program.release_terminal();
1173 let _ = program.restore_terminal();
1174 }
1175
1176 #[test]
1177 fn test_program_with_filter() {
1178 use hojicha_core::core::Cmd;
1179
1180 struct TestModel;
1181 impl Model for TestModel {
1182 type Message = i32;
1183 fn update(&mut self, _: Event<Self::Message>) -> Cmd<Self::Message> {
1184 Cmd::noop()
1185 }
1186 fn view(&self) -> String {
1187 "TestModel".to_string()
1188 }
1189 }
1190
1191 let options = ProgramOptions::default().headless();
1192 let program = Program::with_options(TestModel, options).unwrap();
1193
1194 let _filtered = program.with_filter(|_, event| match event {
1195 Event::User(n) if n > 5 => None,
1196 _ => Some(event),
1197 });
1198 }
1199}