1use std::io;
10use std::io::Stdout;
11use std::sync::OnceLock;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread;
15use std::time::Duration;
16
17use crossterm::terminal;
18
19use crate::Control;
20use crate::console::OverflowMethod;
21use crate::style::Style;
22use crate::text::Text;
23use crate::{Console, JustifyMethod, Renderable};
24
25#[cfg(unix)]
26use std::fs::File;
27#[cfg(unix)]
28use std::io::{BufRead, BufReader};
29#[cfg(unix)]
30use std::os::fd::{FromRawFd, RawFd};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum VerticalOverflowMethod {
34 Crop,
35 Ellipsis,
36 Visible,
37}
38
39impl Default for VerticalOverflowMethod {
40 fn default() -> Self {
41 Self::Ellipsis
42 }
43}
44
45#[derive(Debug, Clone)]
46pub struct LiveOptions {
47 pub screen: bool,
48 pub auto_refresh: bool,
49 pub refresh_per_second: f64,
50 pub transient: bool,
51 pub vertical_overflow: VerticalOverflowMethod,
52 pub redirect_stdout: bool,
54 pub redirect_stderr: bool,
56}
57
58impl Default for LiveOptions {
59 fn default() -> Self {
60 Self {
61 screen: false,
62 auto_refresh: true,
63 refresh_per_second: 4.0,
64 transient: false,
65 vertical_overflow: VerticalOverflowMethod::Ellipsis,
66 redirect_stdout: false,
67 redirect_stderr: false,
68 }
69 }
70}
71
72struct LiveState {
73 options: LiveOptions,
74 started: bool,
75 live_id: Option<usize>,
76 is_root: bool,
77 alt_screen: bool,
78 pending_renderable: Option<Box<dyn Renderable + Send + Sync>>,
79}
80
81pub struct Live {
87 console: Arc<Mutex<Console<Stdout>>>,
88 state: Arc<Mutex<LiveState>>,
89 stop_flag: Arc<AtomicBool>,
90 started_flag: Arc<AtomicBool>,
91 refresh_thread: Option<thread::JoinHandle<()>>,
92 #[cfg(unix)]
93 redirects: Vec<StreamRedirect>,
94 get_renderable: Option<Arc<dyn Fn() -> Box<dyn Renderable + Send + Sync> + Send + Sync>>,
97}
98
99#[cfg(unix)]
100struct StreamRedirect {
101 target_fd: RawFd,
102 original_fd: RawFd,
103 pipe_write_fd: RawFd,
104 worker: thread::JoinHandle<()>,
105}
106
107#[cfg(unix)]
108unsafe extern "C" {
109 fn dup(oldfd: i32) -> i32;
110 fn dup2(oldfd: i32, newfd: i32) -> i32;
111 fn pipe(fds: *mut i32) -> i32;
112 fn close(fd: i32) -> i32;
113}
114
115#[cfg(unix)]
116fn stream_redirect_lock() -> &'static Mutex<()> {
117 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
118 LOCK.get_or_init(|| Mutex::new(()))
119}
120
121impl Live {
122 pub fn new(renderable: Box<dyn Renderable + Send + Sync>) -> Self {
123 Self::with_console(renderable, Console::new(), LiveOptions::default())
124 }
125
126 pub fn with_options(
127 renderable: Box<dyn Renderable + Send + Sync>,
128 options: LiveOptions,
129 ) -> Self {
130 Self::with_console(renderable, Console::new(), options)
131 }
132
133 pub fn with_console(
134 renderable: Box<dyn Renderable + Send + Sync>,
135 console: Console<Stdout>,
136 options: LiveOptions,
137 ) -> Self {
138 assert!(
139 options.refresh_per_second > 0.0,
140 "refresh_per_second must be > 0"
141 );
142
143 let transient = if options.screen {
144 true
145 } else {
146 options.transient
147 };
148 let options = LiveOptions {
149 transient,
150 ..options
151 };
152 let state = LiveState {
153 options,
154 started: false,
155 live_id: None,
156 is_root: false,
157 alt_screen: false,
158 pending_renderable: Some(renderable),
159 };
160
161 Live {
162 console: Arc::new(Mutex::new(console)),
163 state: Arc::new(Mutex::new(state)),
164 stop_flag: Arc::new(AtomicBool::new(false)),
165 started_flag: Arc::new(AtomicBool::new(false)),
166 refresh_thread: None,
167 #[cfg(unix)]
168 redirects: Vec::new(),
169 get_renderable: None,
170 }
171 }
172
173 pub fn is_started(&self) -> bool {
174 self.started_flag.load(Ordering::SeqCst)
175 }
176
177 pub fn with_get_renderable(
182 mut self,
183 f: impl Fn() -> Box<dyn Renderable + Send + Sync> + Send + Sync + 'static,
184 ) -> Self {
185 self.get_renderable = Some(Arc::new(f));
186 self
187 }
188
189 pub(crate) fn started_flag(&self) -> Arc<AtomicBool> {
190 self.started_flag.clone()
191 }
192
193 pub(crate) fn refresh_per_second(&self) -> f64 {
194 self.state
195 .lock()
196 .expect("live state mutex poisoned")
197 .options
198 .refresh_per_second
199 }
200
201 pub fn start(&mut self, refresh: bool) -> io::Result<()> {
202 let mut state = self.state.lock().expect("live state mutex poisoned");
203 if state.started {
204 return Ok(());
205 }
206
207 let mut console = self.console.lock().expect("console mutex poisoned");
208 sync_terminal_size(&mut console);
209
210 let interactive = console.is_terminal() && !console.is_dumb_terminal();
211 if !interactive {
212 state.started = true;
215 state.live_id = None;
216 state.is_root = false;
217 state.alt_screen = false;
218 self.started_flag.store(false, Ordering::SeqCst);
219 return Ok(());
220 }
221
222 let renderable = state
223 .pending_renderable
224 .take()
225 .unwrap_or_else(|| Box::new(Text::plain("")));
226
227 let live_options = state.options.clone();
228 let (id, is_root) = console.live_start(renderable, live_options.vertical_overflow);
229 state.live_id = Some(id);
230 state.is_root = is_root;
231 state.started = true;
232 self.started_flag.store(true, Ordering::SeqCst);
233
234 if is_root {
235 if live_options.screen {
236 state.alt_screen = console.set_alt_screen(true)?;
237 }
238 let _ = console.show_cursor(false)?;
239 }
240
241 drop(console);
242 let auto_refresh = live_options.auto_refresh;
243 let is_root = state.is_root;
244 drop(state);
245
246 if is_root {
247 self.start_redirects(&live_options)?;
248 }
249
250 if refresh {
251 self.refresh()?;
252 }
253
254 if auto_refresh && is_root {
255 self.spawn_refresh_thread();
256 }
257
258 Ok(())
259 }
260
261 pub fn stop(&mut self) -> io::Result<()> {
262 self.stop_flag.store(true, Ordering::SeqCst);
263 if let Some(handle) = self.refresh_thread.take() {
264 let _ = handle.join();
265 }
266 self.stop_redirects();
267 self.started_flag.store(false, Ordering::SeqCst);
268
269 let mut state = self.state.lock().expect("live state mutex poisoned");
270 if !state.started {
271 return Ok(());
272 }
273 state.started = false;
274
275 let id = state.live_id.take();
276 let is_root = state.is_root;
277 state.is_root = false;
278 let alt_screen = state.alt_screen;
279 state.alt_screen = false;
280 let options = state.options.clone();
281
282 let Some(id) = id else {
283 if !options.transient {
285 let renderable = state
286 .pending_renderable
287 .take()
288 .unwrap_or_else(|| Box::new(Text::plain("")));
289 drop(state);
290 let mut console = self.console.lock().expect("console mutex poisoned");
291 let _ = console.print(renderable.as_ref(), None, None, None, false, "\n");
292 }
293 return Ok(());
294 };
295
296 let mut console = self.console.lock().expect("console mutex poisoned");
297
298 if !is_root {
300 let renderable = console.live_stop(id);
301 if !options.transient {
302 if let Some(renderable) = renderable {
303 let _ = console.print(renderable.as_ref(), None, None, None, false, "\n");
304 }
305 } else if console.is_terminal() && !console.is_dumb_terminal() {
306 let _ = console.print(&Control::new(), None, None, None, false, "");
308 }
309 return Ok(());
310 }
311
312 if is_root && console.is_terminal() && !console.is_dumb_terminal() && !alt_screen {
314 console.live_set_vertical_overflow(id, VerticalOverflowMethod::Visible);
315 let _ = console.print(&Control::new(), None, None, None, false, "");
316 }
317
318 let restore_controls = if is_root
321 && console.is_terminal()
322 && !console.is_dumb_terminal()
323 && options.transient
324 && !alt_screen
325 {
326 console.live_restore_cursor()
327 } else {
328 crate::Segments::new()
329 };
330
331 console.live_clear();
333
334 if is_root {
336 if console.is_terminal() && !alt_screen {
337 let _ = console.print(&Text::plain(""), None, None, None, false, "\n");
338 }
339
340 let _ = console.show_cursor(true);
341 if alt_screen {
342 let _ = console.set_alt_screen(false);
343 }
344 }
345
346 if !restore_controls.is_empty() {
347 let _ = console.print_segments(&restore_controls);
348 }
349
350 Ok(())
351 }
352
353 pub fn update(
354 &self,
355 renderable: Box<dyn Renderable + Send + Sync>,
356 refresh: bool,
357 ) -> io::Result<()> {
358 let (id, started) = {
359 let mut state = self.state.lock().expect("live state mutex poisoned");
360 if !state.started {
361 state.pending_renderable = Some(renderable);
362 return Ok(());
363 }
364 if state.live_id.is_none() {
365 state.pending_renderable = Some(renderable);
367 return Ok(());
368 }
369 (state.live_id, state.started)
370 };
371
372 if started {
373 if let Some(id) = id {
374 let mut console = self.console.lock().expect("console mutex poisoned");
375 console.live_update(id, renderable);
376 }
377 }
378 if refresh {
379 self.refresh()?;
380 }
381 Ok(())
382 }
383
384 pub fn refresh(&self) -> io::Result<()> {
385 let state = self.state.lock().expect("live state mutex poisoned");
386 if !state.started {
387 return Ok(());
388 }
389 if state.live_id.is_none() {
390 return Ok(());
392 }
393 drop(state);
394 let mut console = self.console.lock().expect("console mutex poisoned");
395 sync_terminal_size(&mut console);
396 console.print(&Control::new(), None, None, None, false, "")
397 }
398
399 pub fn print<R: Renderable + ?Sized>(
400 &self,
401 renderable: &R,
402 style: Option<Style>,
403 justify: Option<JustifyMethod>,
404 overflow: Option<OverflowMethod>,
405 no_wrap: bool,
406 end: &str,
407 ) -> io::Result<()> {
408 let mut console = self.console.lock().expect("console mutex poisoned");
409 console.print(renderable, style, justify, overflow, no_wrap, end)
410 }
411
412 pub fn log<R: Renderable + ?Sized>(
413 &self,
414 renderable: &R,
415 file: Option<&str>,
416 line: Option<u32>,
417 ) -> io::Result<()> {
418 let mut console = self.console.lock().expect("console mutex poisoned");
419 console.log(renderable, file, line)
420 }
421
422 fn spawn_refresh_thread(&mut self) {
423 if self.refresh_thread.is_some() {
424 return;
425 }
426
427 self.stop_flag.store(false, Ordering::SeqCst);
428 let stop_flag = self.stop_flag.clone();
429 let console = self.console.clone();
430 let state = self.state.clone();
431 let get_renderable = self.get_renderable.clone();
432 let refresh_per_second = state
433 .lock()
434 .expect("live state mutex poisoned")
435 .options
436 .refresh_per_second;
437
438 let handle = thread::spawn(move || {
439 let sleep = Duration::from_secs_f64(1.0 / refresh_per_second.max(0.001));
440 while !stop_flag.load(Ordering::SeqCst) {
441 thread::sleep(sleep);
442 if stop_flag.load(Ordering::SeqCst) {
443 break;
444 }
445 let state_guard = match state.lock() {
446 Ok(g) => g,
447 Err(_) => break,
448 };
449 if !state_guard.started {
450 continue;
451 }
452 let live_id = state_guard.live_id;
453 drop(state_guard);
454
455 if let Some(ref get_renderable) = get_renderable {
457 if let Some(id) = live_id {
458 let renderable = get_renderable();
459 let mut console_guard = match console.lock() {
460 Ok(g) => g,
461 Err(_) => break,
462 };
463 console_guard.live_update(id, renderable);
464 sync_terminal_size(&mut console_guard);
465 let _ = console_guard.print(&Control::new(), None, None, None, false, "");
466 continue;
467 }
468 }
469
470 let mut console_guard = match console.lock() {
471 Ok(g) => g,
472 Err(_) => break,
473 };
474 sync_terminal_size(&mut console_guard);
475 let _ = console_guard.print(&Control::new(), None, None, None, false, "");
476 }
477 });
478
479 self.refresh_thread = Some(handle);
480 }
481
482 #[cfg(not(unix))]
483 fn start_redirects(&mut self, _options: &LiveOptions) -> io::Result<()> {
484 Ok(())
485 }
486
487 #[cfg(unix)]
488 fn start_redirects(&mut self, options: &LiveOptions) -> io::Result<()> {
489 if options.redirect_stdout {
490 self.start_redirect_stream(1)?;
491 }
492 if options.redirect_stderr {
493 self.start_redirect_stream(2)?;
494 }
495 Ok(())
496 }
497
498 #[cfg(not(unix))]
499 fn stop_redirects(&mut self) {}
500
501 #[cfg(unix)]
502 fn stop_redirects(&mut self) {
503 for redirect in self.redirects.drain(..) {
504 let _guard = stream_redirect_lock()
505 .lock()
506 .expect("redirect lock mutex poisoned");
507 let _ = unsafe { dup2(redirect.original_fd, redirect.target_fd) };
508 let _ = unsafe { close(redirect.pipe_write_fd) };
509 let _ = unsafe { close(redirect.original_fd) };
510 drop(_guard);
511 let _ = redirect.worker.join();
512 }
513 }
514
515 #[cfg(unix)]
516 fn start_redirect_stream(&mut self, target_fd: RawFd) -> io::Result<()> {
517 let mut fds = [0_i32; 2];
518 if unsafe { pipe(fds.as_mut_ptr()) } == -1 {
519 return Err(io::Error::last_os_error());
520 }
521 let read_fd = fds[0];
522 let write_fd = fds[1];
523
524 let original_fd = unsafe { dup(target_fd) };
525 if original_fd == -1 {
526 let _ = unsafe { close(read_fd) };
527 let _ = unsafe { close(write_fd) };
528 return Err(io::Error::last_os_error());
529 }
530
531 if unsafe { dup2(write_fd, target_fd) } == -1 {
532 let _ = unsafe { close(read_fd) };
533 let _ = unsafe { close(write_fd) };
534 let _ = unsafe { close(original_fd) };
535 return Err(io::Error::last_os_error());
536 }
537
538 let console = self.console.clone();
539 let worker = thread::spawn(move || {
540 let file = unsafe { File::from_raw_fd(read_fd) };
541 let mut reader = BufReader::new(file);
542 let mut buf = Vec::<u8>::new();
543
544 loop {
545 buf.clear();
546 let bytes = match reader.read_until(b'\n', &mut buf) {
547 Ok(n) => n,
548 Err(_) => break,
549 };
550 if bytes == 0 {
551 break;
552 }
553
554 let has_newline = buf.last().copied() == Some(b'\n');
555 let text_slice = if has_newline {
556 &buf[..buf.len().saturating_sub(1)]
557 } else {
558 &buf[..]
559 };
560 if text_slice.is_empty() && has_newline {
561 continue;
562 }
563 let text = String::from_utf8_lossy(text_slice).to_string();
564 let end = if has_newline { "\n" } else { "" };
565
566 let _guard = stream_redirect_lock()
567 .lock()
568 .expect("redirect lock mutex poisoned");
569 if unsafe { dup2(original_fd, target_fd) } == -1 {
570 break;
571 }
572 {
573 let mut guard = match console.lock() {
574 Ok(g) => g,
575 Err(_) => break,
576 };
577 let _ = guard.print(&Text::plain(text), None, None, None, false, end);
578 }
579 if unsafe { dup2(write_fd, target_fd) } == -1 {
580 break;
581 }
582 }
583 });
584
585 self.redirects.push(StreamRedirect {
586 target_fd,
587 original_fd,
588 pipe_write_fd: write_fd,
589 worker,
590 });
591 Ok(())
592 }
593}
594
595impl Drop for Live {
596 fn drop(&mut self) {
597 let _ = self.stop();
599 }
600}
601
602fn sync_terminal_size(console: &mut Console<Stdout>) {
603 if !console.is_terminal() {
604 return;
605 }
606 if let Ok((w, h)) = terminal::size() {
607 let w = w as usize;
608 let h = h as usize;
609 let opts = console.options_mut();
610 opts.size = (w, h);
611 opts.max_width = w.max(1);
612 opts.max_height = h;
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 #[test]
621 fn test_refresh_per_second_accessor() {
622 let live = Live::with_options(
623 Box::new(Text::plain("x")),
624 LiveOptions {
625 refresh_per_second: 7.5,
626 ..Default::default()
627 },
628 );
629 assert_eq!(live.refresh_per_second(), 7.5);
630 }
631
632 #[cfg(unix)]
633 fn redirect_test_lock() -> &'static Mutex<()> {
634 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
635 LOCK.get_or_init(|| Mutex::new(()))
636 }
637
638 #[cfg(unix)]
639 #[test]
640 fn test_redirect_stdout_lifecycle() {
641 let _guard = redirect_test_lock()
642 .lock()
643 .expect("redirect test lock poisoned");
644 let mut live = Live::with_options(
645 Box::new(Text::plain("x")),
646 LiveOptions {
647 redirect_stdout: true,
648 ..Default::default()
649 },
650 );
651 let options = LiveOptions {
652 redirect_stdout: true,
653 ..Default::default()
654 };
655 live.start_redirects(&options).unwrap();
656 assert_eq!(live.redirects.len(), 1);
657 live.stop_redirects();
658 assert!(live.redirects.is_empty());
659 }
660}