assemble_core/
logging.rs

1//! Defines different parts of the logging utilities for assemble-daemon
2
3use crate::identifier::{ProjectId, TaskId};
4use crate::unstable::text_factory::AssembleFormatter;
5use atty::Stream;
6use colored::Colorize;
7use fern::{Dispatch, FormatCallback, Output};
8use indicatif::MultiProgress;
9use log::{Level, LevelFilter, Log, Record, SetLoggerError};
10use merge::Merge;
11use once_cell::sync::{Lazy, OnceCell};
12
13use std::cell::RefCell;
14use std::collections::{HashMap, VecDeque};
15
16use std::io::{stdout, ErrorKind, Write};
17use std::path::{Path, PathBuf};
18
19use std::ffi::OsStr;
20use std::sync::atomic::AtomicBool;
21use std::sync::mpsc::{channel, Sender};
22use std::sync::{Arc, Mutex};
23use std::thread::JoinHandle;
24use std::time::{Duration, Instant};
25use std::{fmt, io, thread};
26use thread_local::ThreadLocal;
27use time::format_description::FormatItem;
28use time::macros::format_description;
29use time::OffsetDateTime;
30
31/// Provides helpful logging args for clap clis
32#[derive(Debug, clap::Args, Clone, merge::Merge)]
33#[clap(next_help_heading = "Log Level")]
34pub struct LoggingArgs {
35    /// Only display error level log messages
36    #[clap(short, long)]
37    #[clap(conflicts_with_all(&["warn", "info", "debug", "trace"]))]
38    #[clap(display_order = 1)]
39    #[clap(global = true)]
40    #[merge(strategy = merge::bool::overwrite_false)]
41    error: bool,
42
43    /// Display warning and above level log messages
44    #[clap(short, long)]
45    #[clap(conflicts_with_all(&["error", "info", "debug", "trace"]))]
46    #[clap(display_order = 2)]
47    #[clap(global = true)]
48    #[merge(strategy = merge::bool::overwrite_false)]
49    warn: bool,
50
51    /// Display info and above level log messages
52    #[clap(short, long)]
53    #[clap(conflicts_with_all(&["error", "warn", "debug", "trace"]))]
54    #[clap(display_order = 3)]
55    #[clap(global = true)]
56    #[merge(strategy = merge::bool::overwrite_false)]
57    info: bool,
58
59    /// Display debug and above level log messages
60    #[clap(long, short)]
61    #[clap(conflicts_with_all(&["error", "warn", "info", "trace"]))]
62    #[clap(display_order = 4)]
63    #[clap(global = true)]
64    #[merge(strategy = merge::bool::overwrite_false)]
65    debug: bool,
66
67    /// Display trace and above level log messages
68    #[clap(long)]
69    #[clap(conflicts_with_all(&["error", "warn", "info", "debug"]))]
70    #[clap(display_order = 5)]
71    #[clap(global = true)]
72    #[merge(strategy = merge::bool::overwrite_false)]
73    trace: bool,
74
75    /// Show the source of a logging statement when running in any non complicated mode
76    #[clap(long)]
77    #[clap(help_heading = "Logging Settings")]
78    #[clap(global = true)]
79    #[merge(strategy =merge::bool::overwrite_false)]
80    pub show_source: bool,
81
82    /// Outputs everything as json
83    #[clap(long)]
84    #[clap(help_heading = "Logging Settings")]
85    #[clap(global = true)]
86    #[merge(strategy = merge::bool::overwrite_false)]
87    pub json: bool,
88
89    /// The console output mode.
90    #[clap(long, value_enum, default_value_t = ConsoleMode::Auto)]
91    #[clap(help_heading = "Logging Settings")]
92    #[clap(global = true)]
93    pub console: ConsoleMode,
94}
95
96impl Default for LoggingArgs {
97    fn default() -> Self {
98        Self {
99            show_source: false,
100            error: false,
101            warn: false,
102            info: false,
103            debug: true,
104            trace: false,
105            json: false,
106            console: ConsoleMode::Plain,
107        }
108    }
109}
110
111#[derive(Default, Eq, PartialEq)]
112pub enum OutputType {
113    #[default]
114    Basic,
115    TimeOnly,
116    Complicated,
117    Json,
118}
119
120#[derive(Debug, Copy, Clone, clap::ValueEnum, Eq, PartialEq)]
121#[repr(u8)]
122pub enum ConsoleMode {
123    Auto,
124    Rich,
125    Plain,
126}
127
128impl Merge for ConsoleMode {
129    fn merge(&mut self, other: Self) {
130        if self == &Self::Auto {
131            *self = other;
132        }
133    }
134}
135
136impl ConsoleMode {
137    pub fn resolve(self) -> Self {
138        match &self {
139            ConsoleMode::Auto => {
140                if atty::is(Stream::Stdout) {
141                    ConsoleMode::Rich
142                } else {
143                    ConsoleMode::Plain
144                }
145            }
146            ConsoleMode::Rich => self,
147            ConsoleMode::Plain => self,
148        }
149    }
150}
151
152impl LoggingArgs {
153    /// Gets the log level
154    pub fn log_level_filter(&self) -> LevelFilter {
155        if self.error {
156            LevelFilter::Error
157        } else if self.warn {
158            LevelFilter::Warn
159        } else if self.info {
160            LevelFilter::Info
161        } else if self.debug {
162            LevelFilter::Debug
163        } else if self.trace {
164            LevelFilter::Trace
165        } else {
166            LevelFilter::Info
167        }
168    }
169
170    /// Get the level filter from this args
171    fn config_from_settings(&self) -> (LevelFilter, OutputType) {
172        let level = self.log_level_filter();
173        let mut output_type = if self.error {
174            OutputType::Basic
175        } else if self.warn {
176            OutputType::Basic
177        } else if self.info {
178            OutputType::Basic
179        } else if self.debug {
180            OutputType::Basic
181        } else if self.trace {
182            OutputType::TimeOnly
183        } else {
184            OutputType::Basic
185        };
186        if self.json {
187            output_type = OutputType::Json;
188        }
189        (level, output_type)
190    }
191
192    pub fn init_root_logger(&self) -> Result<Option<JoinHandle<()>>, SetLoggerError> {
193        let (dispatch, handle) = self.create_logger();
194        dispatch.apply().map(|_| handle)
195    }
196
197    pub fn init_root_logger_with(filter: LevelFilter, mode: OutputType) {
198        Self::try_init_root_logger_with(filter, mode).expect("couldn't create dispatch");
199    }
200
201    pub fn try_init_root_logger_with(
202        filter: LevelFilter,
203        mode: OutputType,
204    ) -> Result<(), SetLoggerError> {
205        Self::create_logger_with(filter, mode, false, None).apply()
206    }
207
208    pub fn create_logger(&self) -> (Dispatch, Option<JoinHandle<()>>) {
209        let (filter, output_mode) = self.config_from_settings();
210        let rich: bool = match self.console.resolve() {
211            ConsoleMode::Auto => {
212                unreachable!()
213            }
214            ConsoleMode::Rich => true,
215            ConsoleMode::Plain => false,
216        };
217        if !rich {
218            colored::control::set_override(false);
219        }
220        let (started, handle) = start_central_logger(rich);
221        let central = CentralLoggerInput { sender: started };
222        let output = Output::from(Box::new(central) as Box<dyn Write + Send>);
223        (
224            Self::create_logger_with(filter, output_mode, self.show_source, output),
225            Some(handle),
226        )
227    }
228
229    pub fn create_logger_with(
230        filter: LevelFilter,
231        mode: OutputType,
232        show_source: bool,
233        output: impl Into<Option<Output>>,
234    ) -> Dispatch {
235        let dispatch = Dispatch::new()
236            .level(filter)
237            .chain(output.into().unwrap_or(Output::stdout("\n")));
238        match mode {
239            OutputType::Json => dispatch.format(Self::json_message_format),
240            other => dispatch.format(Self::message_format(other, show_source)),
241        }
242    }
243
244    fn message_format(
245        output_mode: OutputType,
246        show_source: bool,
247    ) -> impl Fn(FormatCallback, &fmt::Arguments, &log::Record) + Sync + Send + 'static {
248        move |out, message, record| {
249            out.finish(format_args!(
250                "{}{}",
251                {
252                    let prefix = Self::format_prefix(&output_mode, record, show_source);
253                    if prefix.is_empty() {
254                        prefix
255                    } else {
256                        format!("{} ", prefix)
257                    }
258                },
259                match record.level() {
260                    Level::Error => {
261                        format!("{}", message.to_string().red())
262                    }
263                    Level::Warn => {
264                        format!("{}", message.to_string().yellow())
265                    }
266                    Level::Info | Level::Debug => {
267                        message.to_string()
268                    }
269                    Level::Trace => {
270                        format!("{}", message.to_string().bright_blue())
271                    }
272                }
273            ))
274        }
275    }
276
277    fn json_message_format(format: FormatCallback, args: &fmt::Arguments, record: &log::Record) {
278        let message = format!("{}", args);
279        let level = record.level();
280        let origin = Origin::None;
281
282        let message_info = JsonMessageInfo {
283            level,
284            origin,
285            message,
286        };
287
288        let as_string = serde_json::to_string(&message_info).unwrap();
289
290        format.finish(format_args!("{}", as_string));
291    }
292
293    fn format_prefix(output_mode: &OutputType, record: &Record, show_source: bool) -> String {
294        use colored::Colorize;
295        let mut level_string = record.level().to_string().to_lowercase();
296
297        level_string = match record.level() {
298            Level::Error => level_string.red().to_string(),
299            Level::Warn => level_string.yellow().to_string(),
300            Level::Info => level_string.green().to_string(),
301            Level::Debug => level_string.blue().to_string(),
302            Level::Trace => level_string.bright_black().to_string(),
303        };
304        let output = match output_mode {
305            OutputType::Basic => String::new(),
306            OutputType::TimeOnly => {
307                static DATE_TIME_FORMAT: &[FormatItem] =
308                    format_description!("[hour]:[minute]:[second].[subsecond digits:4]");
309
310                let time = OffsetDateTime::now_local().unwrap_or(OffsetDateTime::now_utc());
311                format!(
312                    "[{}] {: >7}:",
313                    time.format(DATE_TIME_FORMAT).unwrap(),
314                    level_string
315                )
316            }
317            OutputType::Complicated => {
318                static DATE_TIME_FORMAT: &[FormatItem] = format_description!("[year]/[month]/[day] [hour]:[minute]:[second].[subsecond digits:4] [offset_hour sign:mandatory padding:none] UTC");
319
320                let time = OffsetDateTime::now_utc();
321                let file_path = Path::new(record.file().unwrap_or("unknown"));
322                format!(
323                    "[{} {}{} {}]",
324                    time.format(DATE_TIME_FORMAT).unwrap(),
325                    file_path.file_name().and_then(|s| s.to_str()).unwrap(),
326                    record
327                        .line()
328                        .map(|l| format!(":{l}"))
329                        .unwrap_or("".to_string()),
330                    level_string
331                )
332            }
333            _ => {
334                unreachable!()
335            }
336        };
337        if show_source {
338            if let Some((module, file)) = record.module_path().zip(record.file()) {
339                let line = record.line().map(|i| format!(":{}", i)).unwrap_or_default();
340                let crate_name = module.split("::").next().unwrap();
341                let source: PathBuf = Path::new(file)
342                    .iter()
343                    .skip_while(|&p| p != OsStr::new("src"))
344                    .skip(1)
345                    .collect();
346
347                let source = format!(
348                    "({crate_name} :: {source}{line})",
349                    source = source.to_string_lossy()
350                )
351                .italic();
352
353                format!("{source} {output}")
354            } else {
355                format!("(<unknown source>) {output}")
356            }
357        } else {
358            output
359        }
360    }
361}
362
363pub fn init_root_log(level: LevelFilter, mode: impl Into<Option<OutputType>>) {
364    let mode = mode.into().unwrap_or_default();
365    let _ = LoggingArgs::try_init_root_logger_with(level, mode);
366}
367
368#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash)]
369pub enum Origin {
370    Project(ProjectId),
371    Task(TaskId),
372    None,
373}
374
375impl From<ProjectId> for Origin {
376    fn from(p: ProjectId) -> Self {
377        Self::Project(p)
378    }
379}
380
381impl From<TaskId> for Origin {
382    fn from(t: TaskId) -> Self {
383        Self::Task(t)
384    }
385}
386
387#[derive(Debug, Serialize, Deserialize)]
388pub struct JsonMessageInfo {
389    #[serde(with = "LevelDef")]
390    pub level: Level,
391    pub origin: Origin,
392    pub message: String,
393}
394
395#[derive(Serialize, Deserialize)]
396#[serde(remote = "Level")]
397enum LevelDef {
398    Error,
399    Warn,
400    Info,
401    Debug,
402    Trace,
403}
404
405static THREAD_ORIGIN: Lazy<ThreadLocal<RefCell<Origin>>> = Lazy::new(ThreadLocal::new);
406
407fn thread_origin() -> Origin {
408    THREAD_ORIGIN
409        .get_or(|| RefCell::new(Origin::None))
410        .borrow()
411        .clone()
412}
413
414pub struct LoggingControl(());
415
416/// Provides access to the logging control of the entire program
417pub static LOGGING_CONTROL: Lazy<LoggingControl> = Lazy::new(|| LoggingControl(()));
418
419impl LoggingControl {
420    /// Sets the thread local origin
421    fn use_origin(&self, new_origin: Origin) {
422        trace!(
423            "setting the origin for thread {:?} to {:?}",
424            thread::current().id(),
425            new_origin
426        );
427        let origin = THREAD_ORIGIN.get_or(|| RefCell::new(Origin::None));
428        let mut ref_mut = origin.borrow_mut();
429        *ref_mut = new_origin;
430    }
431
432    pub fn in_project(&self, project: ProjectId) {
433        self.use_origin(Origin::Project(project))
434        // trace!("set origin to {:?}", ref_mut);
435    }
436
437    pub fn in_task(&self, task: TaskId) {
438        self.use_origin(Origin::Task(task))
439        // trace!("set origin to {:?}", ref_mut);
440    }
441
442    pub fn reset(&self) {
443        self.use_origin(Origin::None)
444        // trace!("set origin to {:?}", ref_mut);
445    }
446
447    pub fn stop_logging(&self) {
448        let lock = LOG_COMMAND_SENDER.get().unwrap();
449        let sender = lock.lock().unwrap();
450
451        sender.send(LoggingCommand::Stop).unwrap();
452    }
453
454    pub fn start_task(&self, id: &TaskId) {
455        let lock = LOG_COMMAND_SENDER.get().unwrap();
456        let sender = lock.lock().unwrap();
457
458        sender
459            .send(LoggingCommand::TaskStarted(id.clone()))
460            .unwrap();
461    }
462
463    pub fn end_task(&self, id: &TaskId) {
464        let lock = LOG_COMMAND_SENDER.get().unwrap();
465        let sender = lock.lock().unwrap();
466
467        sender.send(LoggingCommand::TaskEnded(id.clone())).unwrap();
468    }
469
470    /// Start a progress bar. Returns err if a progress bar has already been started. If Ok, the
471    /// returned value is a clone of the multi-progress bar
472    pub fn start_progress_bar(&self, bar: &MultiProgress) -> Result<MultiProgress, ()> {
473        let lock = LOG_COMMAND_SENDER.get().unwrap();
474        let sender = lock.lock().unwrap();
475        sender
476            .send(LoggingCommand::StartMultiProgress(bar.clone()))
477            .unwrap();
478        Ok(bar.clone())
479    }
480
481    /// End a progress bar if it exists
482    pub fn end_progress_bar(&self) {
483        let lock = LOG_COMMAND_SENDER.get().unwrap();
484        let sender = lock.lock().unwrap();
485
486        sender.send(LoggingCommand::EndMultiProgress).unwrap();
487    }
488
489    /// Run a closure within an origin context
490    #[cfg(feature = "log_origin_control")]
491    pub fn with_origin<O: Into<Origin>, F: FnOnce() -> R, R>(&self, origin: O, func: F) -> R {
492        let origin = origin.into();
493
494        self.use_origin(origin);
495        let ret = (func)();
496        self.reset();
497        ret
498    }
499
500    /// Gets the origin currently set
501    #[cfg(feature = "log_origin_control")]
502    pub fn get_origin(&self) -> Origin {
503        THREAD_ORIGIN
504            .get_or(|| RefCell::new(Origin::None))
505            .borrow()
506            .clone()
507    }
508}
509
510static CONTINUE_LOGGING: AtomicBool = AtomicBool::new(true);
511static LOG_COMMAND_SENDER: OnceCell<Arc<Mutex<Sender<LoggingCommand>>>> = OnceCell::new();
512
513fn start_central_logger(rich: bool) -> (Sender<LoggingCommand>, JoinHandle<()>) {
514    let (send, recv) = channel();
515    let _ = LOG_COMMAND_SENDER.set(Arc::new(Mutex::new(send.clone())));
516    let handle = thread::spawn(move || {
517        let mut central_logger = CentralLoggerOutput::new();
518        loop {
519            let command = match recv.recv() {
520                Ok(s) => s,
521                Err(_) => break,
522            };
523
524            match command {
525                LoggingCommand::LogString(o, s) => {
526                    central_logger.add_output(o, &s);
527                    central_logger.flush_current_origin();
528                }
529                LoggingCommand::Flush => central_logger.flush(),
530                LoggingCommand::Stop => {
531                    break;
532                }
533                LoggingCommand::TaskStarted(s) => {
534                    if !rich {
535                        central_logger.add_output(Origin::Task(s), "");
536                        central_logger.flush_current_origin();
537                    }
538                }
539                LoggingCommand::TaskEnded(_s) => {}
540                LoggingCommand::TaskStatus(_, _) => {}
541                LoggingCommand::StartMultiProgress(b) => {
542                    central_logger.start_progress_bar(&b).unwrap();
543                }
544                LoggingCommand::EndMultiProgress => {
545                    central_logger.end_progress_bar();
546                }
547            }
548        }
549
550        central_logger.flush();
551    });
552    LOGGING_CONTROL.reset();
553    (send, handle)
554}
555
556pub enum LoggingCommand {
557    LogString(Origin, String),
558    TaskStarted(TaskId),
559    TaskEnded(TaskId),
560    TaskStatus(TaskId, String),
561    StartMultiProgress(MultiProgress),
562    EndMultiProgress,
563    Flush,
564    Stop,
565}
566
567pub struct CentralLoggerInput {
568    sender: Sender<LoggingCommand>,
569}
570
571assert_impl_all!(CentralLoggerInput: Send, Write);
572
573impl io::Write for CentralLoggerInput {
574    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
575        let string = String::from_utf8_lossy(buf).to_string();
576
577        let origin = thread_origin();
578        // println!("sending from origin: {origin:?}");
579        self.sender
580            .send(LoggingCommand::LogString(origin, string))
581            .map_err(|e| io::Error::new(ErrorKind::Interrupted, e))?;
582        Ok(buf.len())
583    }
584
585    fn flush(&mut self) -> io::Result<()> {
586        self.sender
587            .send(LoggingCommand::Flush)
588            .map_err(|e| io::Error::new(ErrorKind::Interrupted, e))
589    }
590}
591
592#[derive(Debug)]
593pub struct CentralLoggerOutput {
594    saved_output: HashMap<Origin, String>,
595    origin_buffers: HashMap<Origin, String>,
596    origin_queue: VecDeque<Origin>,
597    previous: Option<Origin>,
598    last_query: Option<Instant>,
599    progress_bar: Option<MultiProgress>,
600}
601
602impl CentralLoggerOutput {
603    pub fn new() -> Self {
604        Self {
605            saved_output: Default::default(),
606            origin_buffers: Default::default(),
607            origin_queue: Default::default(),
608            previous: None,
609            last_query: None,
610            progress_bar: None,
611        }
612    }
613
614    pub fn add_output(&mut self, origin: Origin, msg: &str) {
615        let buffer = self.origin_buffers.entry(origin.clone()).or_default();
616        *buffer = format!("{}{}", buffer, msg);
617        if let Some(front) = self.origin_queue.front() {
618            if front != &origin {
619                if self.last_query.unwrap().elapsed() >= Duration::from_millis(100) {
620                    self.origin_queue.pop_front();
621                }
622                self.origin_queue.push_back(origin);
623            }
624            self.last_query = Some(Instant::now());
625        } else {
626            self.origin_queue.push_back(origin);
627        }
628    }
629
630    /// Flushes current lines from an origin
631    pub fn flush_current_origin(&mut self) {
632        self.last_query = Some(Instant::now());
633        let origin = self.origin_queue.front().cloned().unwrap_or(Origin::None);
634
635        if Some(&origin) != self.previous.as_ref() {
636            match &origin {
637                Origin::Project(p) => {
638                    self.println(format!(
639                        "{}",
640                        AssembleFormatter::default()
641                            .project_status(p, "configuring")
642                            .unwrap()
643                    ))
644                    .unwrap();
645                }
646                Origin::Task(t) => {
647                    self.println(format!(
648                        "{}",
649                        AssembleFormatter::default().task_status(t, "").unwrap()
650                    ))
651                    .unwrap();
652                }
653                Origin::None => {}
654            }
655        }
656
657        self.previous = Some(origin.clone());
658        let printer = self.logger_stdout();
659        let saved = self.saved_output.entry(origin.clone()).or_default();
660        if let Some(buffer) = self.origin_buffers.get_mut(&origin) {
661            let mut lines = Vec::new();
662            while let Some(position) = buffer.chars().position(|c| c == '\n') {
663                let head = &buffer[..position];
664                let tail = buffer.get((position + 1)..).unwrap_or_default();
665
666                lines.push(head.to_string());
667
668                *buffer = tail.to_string();
669            }
670
671            for line in lines {
672                if !(saved.trim().is_empty() && line.trim().is_empty()) {
673                    printer.println(&line).unwrap();
674                    *saved = format!("{}{}", saved, line);
675                }
676            }
677
678            if buffer.trim().is_empty() {
679                self.origin_queue.pop_front();
680            }
681        }
682    }
683
684    pub fn flush(&mut self) {
685        let printer = self.logger_stdout();
686        let drained = self.origin_queue.drain(..).collect::<Vec<_>>();
687        for origin in drained {
688            if let Some(str) = self.origin_buffers.get_mut(&origin) {
689                printer.println(format!("{origin:?}: {}", str)).unwrap();
690                str.clear();
691            }
692        }
693        stdout().flush().unwrap();
694    }
695
696    pub fn println(&self, string: impl AsRef<str>) -> io::Result<()> {
697        match &self.progress_bar {
698            None => {
699                writeln!(stdout(), "{}", string.as_ref())
700            }
701            Some(p) => p.println(string),
702        }
703    }
704
705    pub fn logger_stdout(&self) -> LoggerStdout {
706        LoggerStdout {
707            progress: self.progress_bar.clone(),
708        }
709    }
710
711    /// Start a progress bar. Returns err if a progress bar has already been started. If Ok, the
712    /// returned value is a clone of the multi-progress bar
713    pub fn start_progress_bar(&mut self, bar: &MultiProgress) -> Result<MultiProgress, ()> {
714        self.progress_bar = Some(bar.clone());
715        Ok(bar.clone())
716    }
717
718    /// End a progress bar if it exists
719    pub fn end_progress_bar(&mut self) {
720        let replaced = std::mem::replace(&mut self.progress_bar, None);
721        if let Some(replaced) = replaced {
722            replaced.clear().unwrap();
723        }
724    }
725}
726
727pub struct LoggerStdout {
728    progress: Option<MultiProgress>,
729}
730
731impl LoggerStdout {
732    pub fn println(&self, string: impl AsRef<str>) -> io::Result<()> {
733        match &self.progress {
734            None => {
735                writeln!(stdout(), "{}", string.as_ref())
736            }
737            Some(p) => p.println(string),
738        }
739    }
740}