use std::{path::Path, process::ExitStatus, time::Duration};
use clap::ValueEnum;
use serde_json::Value;
use tracing::{Level, debug, error, info, trace, warn};
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
use radicle::{git::Oid, identity::RepoId, node::Event, patch::PatchId};
use radicle_job::{JobId, Reason};
use crate::{
adapter::Adapter,
ci_event::CiEvent,
ci_event_source::{CiEventSource, CiEventSourceError},
config::Config,
db::{QueueId, QueuedCiEvent},
filter::{Decision, EventFilter},
msg::{Request, RunId},
node_event_source::NodeEventSource,
pages::PageError,
queueadd::AdderError,
queueproc::{MaybeShutdown, QueueError},
run::Run,
timeoutcmd::TimeoutError,
};
#[derive(Debug, thiserror::Error)]
pub enum KindError {
#[error("unknown log message kind {0:?}")]
Unknown(String),
#[error("unknown log message kind {0:?}")]
UnknownValue(Value),
}
#[derive(Debug, Clone, ValueEnum, Eq, PartialEq)]
pub enum Kind {
Debug,
Startup,
Shutdown,
GotEvent,
StartRun,
AdapterMessage,
FinishRun,
FilterDecision,
}
impl std::fmt::Display for Kind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Debug => "debug",
Self::Startup => "startup",
Self::Shutdown => "shutdown",
Self::GotEvent => "got_event",
Self::StartRun => "start_run",
Self::FinishRun => "finish_run",
Self::AdapterMessage => "adapter_message",
Self::FilterDecision => "filter_decision",
};
write!(f, "{s}")
}
}
impl TryFrom<&str> for Kind {
type Error = KindError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"debug" => Ok(Self::Debug),
"startup" => Ok(Self::Startup),
"shutdown" => Ok(Self::Shutdown),
"got_event" => Ok(Self::GotEvent),
"start_run" => Ok(Self::StartRun),
"finish_run" => Ok(Self::FinishRun),
"adapter_message" => Ok(Self::AdapterMessage),
"filter_decision" => Ok(Self::FilterDecision),
_ => Err(KindError::Unknown(value.into())),
}
}
}
impl TryFrom<Value> for Kind {
type Error = KindError;
fn try_from(value: Value) -> Result<Self, Self::Error> {
match value {
Value::String(s) => match s.as_str() {
"debug" => Ok(Self::Debug),
"startup" => Ok(Self::Startup),
"shutdown" => Ok(Self::Shutdown),
"got_event" => Ok(Self::GotEvent),
"start_run" => Ok(Self::StartRun),
"finish_run" => Ok(Self::FinishRun),
"adapter_message" => Ok(Self::AdapterMessage),
"filter_decision" => Ok(Self::FilterDecision),
_ => Err(KindError::Unknown(s)),
},
_ => Err(KindError::UnknownValue(value)),
}
}
}
#[derive(Debug, Copy, Clone)]
enum Id {
AdapterConfig,
AdapterExitCode,
AdapterInvoluntaryExit,
AdapterNoExit,
AdapterNoFirstMessage,
AdapterNoSecondMessage,
AdapterStderrLine,
AdapterStdoutLine,
AdapterTempConfig,
AdapterTimedOut,
AdapterTooManyMessages,
BrokerDatabase,
BrokerRunEnd,
BrokerRunStart,
CiEventSourceCreated,
CiEventSourceDisconnected,
CiEventSourceEnd,
CiEventSourceGotEvents,
CibConfig,
CibEndFailure,
CibEndSuccess,
CibStart,
ConfigDeprecated,
JobCreate,
JobFailure,
JobRunCreate,
JobRunFinished,
NodeEventSourceCreated,
NodeEventSourceDisconnected,
NodeEventSourceEnd,
NodeEventSourceEndOfFile,
NodeEventSourceReceivedEvent,
PagesDisconnected,
PagesEnd,
PagesInterval,
PagesNoDirSet,
PagesDirDoesNotExist,
PagesStart,
QueueAddEnd,
QueueAddEndEvents,
QueueAddEnqueueEvent,
QueueAddStart,
QueueProcActionRun,
QueueProcActionShutdown,
QueueProcActionTerminate,
QueueProcDisconnected,
QueueProcEnd,
QueueProcFilterDecision,
QueueProcFinishedRun,
QueueProcPickedEvent,
QueueProcProcessingEvent,
QueueProcProcessedEvent,
QueueProcQueueLength,
QueueProcRemoveEvent,
QueueProcStart,
QueueProcThreadJoin,
QueueProcTrigger,
QueueProcProcessorResult,
QueueProcWorkerResult,
TriggerCreate,
FilterDecision,
WorkerStart,
WorkerEnd,
}
#[derive(Debug, thiserror::Error)]
pub enum LogLevelError {
#[error("unknown log level {0}")]
UnknownLogLevel(String),
#[error("unknown log level {0:?}")]
UnknownLogLevelValue(Value),
}
#[derive(ValueEnum, Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
pub enum LogLevel {
Trace,
Debug,
Info,
Warning,
Error,
}
impl TryFrom<&str> for LogLevel {
type Error = LogLevelError;
fn try_from(s: &str) -> Result<Self, LogLevelError> {
match s {
"TRACE" => Ok(Self::Trace),
"DEBUG" => Ok(Self::Debug),
"INFO" => Ok(Self::Info),
"WARNING" => Ok(Self::Warning),
"ERROR" => Ok(Self::Error),
_ => Err(LogLevelError::UnknownLogLevel(s.into())),
}
}
}
impl TryFrom<String> for LogLevel {
type Error = LogLevelError;
fn try_from(s: String) -> Result<Self, LogLevelError> {
Self::try_from(s.as_str())
}
}
impl TryFrom<&Value> for LogLevel {
type Error = LogLevelError;
fn try_from(v: &Value) -> Result<Self, LogLevelError> {
match v {
Value::String(s) => Self::try_from(s.as_str()),
_ => Err(LogLevelError::UnknownLogLevelValue(v.clone())),
}
}
}
impl TryFrom<Value> for LogLevel {
type Error = LogLevelError;
fn try_from(v: Value) -> Result<Self, LogLevelError> {
Self::try_from(&v)
}
}
impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Trace => "trace",
Self::Debug => "debug",
Self::Info => "info",
Self::Warning => "warn",
Self::Error => "error",
};
write!(f, "{s}")
}
}
impl From<LogLevel> for tracing::Level {
fn from(log_level: LogLevel) -> Self {
match log_level {
LogLevel::Trace => Level::TRACE,
LogLevel::Debug => Level::DEBUG,
LogLevel::Info => Level::INFO,
LogLevel::Warning => Level::WARN,
LogLevel::Error => Level::ERROR,
}
}
}
#[allow(clippy::unwrap_used)]
pub fn open(level: LogLevel) {
#[cfg(test)]
let writer = fmt::TestWriter::new();
#[cfg(not(test))]
let writer = std::io::stderr;
let fmt_layer = fmt::layer().with_target(false).with_writer(writer).json();
let filter_layer = EnvFilter::try_new(level.to_string()).unwrap();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}
pub fn info(msg: String) {
info!(msg)
}
pub fn start_cib() {
info!(
msg_id = ?Id::CibStart,
kind = %Kind::Startup,
version = env!("VERSION"),
"CI broker starts"
);
}
pub fn end_cib_successfully() {
info!(
msg_id = ?Id::CibEndSuccess,
kind = %Kind::Shutdown,
success = true,
"CI broker ends successfully"
);
}
pub fn end_cib_in_error() {
error!(
msg_id = ?Id::CibEndFailure,
kind = %Kind::Shutdown,
success = false,
"CI broker ends in unrecoverable error"
);
}
pub fn config_deprecated(field: &'static str, filename: &Path) {
warn!(
msg_id = ?Id::ConfigDeprecated,
kind = %Kind::Startup,
field,
?filename,
"configuration field is deprecated and ignored",
);
}
pub fn node_event_source_created(source: &NodeEventSource) {
debug!(
msg_id = ?Id::NodeEventSourceCreated,
kind = %Kind::Startup,
node_event_source = ?source,
"created node event source"
);
}
pub fn node_event_source_got_event(event: &Event) {
trace!(
msg_id = ?Id::NodeEventSourceReceivedEvent,
kind = %Kind::GotEvent,
node_event = ?event,
"node event source received event"
);
}
pub fn node_event_source_eof(source: &NodeEventSource) {
debug!(
msg_id = ?Id::NodeEventSourceEndOfFile,
kind = %Kind::Shutdown,
node_event_source = ?source,
"node event source end of file on control socket"
);
}
pub fn ci_event_source_created(source: &CiEventSource) {
debug!(
msg_id = ?Id::CiEventSourceCreated,
kind = %Kind::Startup,
?source,
"created CI event source"
);
}
pub fn ci_event_source_got_events(events: &[CiEvent]) {
trace!(
msg_id = ?Id::CiEventSourceGotEvents,
kind = %Kind::GotEvent,
?events,
"CI event source received events"
);
}
pub fn ci_event_source_disconnected() {
debug!(
msg_id = ?Id::CiEventSourceDisconnected,
kind = %Kind::Shutdown,
"CI event source received disconnection"
);
}
pub fn ci_event_source_end() {
debug!(
msg_id = ?Id::CiEventSourceEnd,
kind = %Kind::Debug,
"CI event source was notified end of events"
);
}
pub fn loaded_config(config: &Config) {
debug!(
msg_id = ?Id::CibConfig,
kind = %Kind::Startup,
?config,
"loaded configuration"
);
}
pub fn adapter_config(config: &Config) {
trace!(
msg_id = ?Id::AdapterConfig,
kind = %Kind::Debug,
?config,
"adapter configuration"
);
}
pub fn queueproc_start(concurrent_adapters: usize) {
info!(
msg_id = ?Id::QueueProcStart,
kind = %Kind::Startup,
concurrent_adapters,
"start thread to process events until a shutdown event"
);
}
pub fn queueproc_end(result: &Result<(), QueueError>) {
if result.is_err() {
error!(
msg_id = ?Id::QueueProcEnd,
kind = %Kind::Debug,
?result,
"thread to process events ends"
);
} else {
debug!(
msg_id = ?Id::QueueProcEnd,
kind = %Kind::Debug,
?result,
"thread to process events ends"
);
}
}
pub fn queueproc_start_result_receiver() {
info!(
msg_id = ?Id::QueueProcStart,
kind = %Kind::Startup,
"start thread to process results of CI runs"
);
}
pub fn queueproc_end_result_receiver() {
info!(
msg_id = ?Id::QueueProcStart,
kind = %Kind::Startup,
"end thread to process results of CI runs"
);
}
pub fn queueproc_channel_disconnect() {
debug!(
msg_id = ?Id::QueueProcDisconnected,
kind = %Kind::Debug,
"event notification channel disconnected"
);
}
pub fn queueproc_queue_length(len: usize) {
trace!(
msg_id = ?Id::QueueProcQueueLength,
kind = %Kind::Debug,
?len,
"event queue length"
);
}
pub fn queueproc_filter_regex_error(pattern: &str, err: regex::Error) {
warn!(
msg_id = ?Id::QueueProcFilterDecision,
kind = %Kind::FilterDecision,
?pattern,
?err,
"regular expression syntax error");
}
pub fn queueproc_filter_decision(event: &CiEvent, filter: &EventFilter, decision: &Decision) {
let decision = serde_json::to_string(decision).unwrap_or("DECISION AS JSON ERROR".into());
info!(
msg_id = ?Id::QueueProcFilterDecision,
kind = %Kind::FilterDecision,
?event,
?filter,
decision,
"filter decision"
);
}
pub fn queueproc_predicate_decision(event: &CiEvent, filter: &EventFilter, allowed: bool) {
trace!(
msg_id = ?Id::QueueProcFilterDecision,
kind = %Kind::FilterDecision,
?event,
?filter,
?allowed,
"filter predicate decision"
);
}
pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent) {
let id = id.to_string();
info!(
msg_id = ?Id::QueueProcPickedEvent,
kind = %Kind::GotEvent,
%id,
?event,
"picked an event from event queue"
);
}
pub fn queueproc_processing_event(event: &CiEvent, adapter: &Adapter) {
debug!(
msg_id = ?Id::QueueProcProcessingEvent,
kind = %Kind::GotEvent,
?event,
?adapter,
"processing event"
);
}
pub fn queueproc_worker_thread_result(result: Result<(), &QueueError>) {
debug!(
msg_id = ?Id::QueueProcWorkerResult,
kind = %Kind::GotEvent,
?result,
"worker thread result"
);
}
pub fn queueproc_processor_thread_result(result: Result<(), &QueueError>) {
debug!(
msg_id = ?Id::QueueProcProcessorResult,
kind = %Kind::GotEvent,
?result,
"processor thread result"
);
}
pub fn queueproc_thread_join() {
info!(
msg_id = ?Id::QueueProcThreadJoin,
kind = %Kind::FinishRun,
"joining thread failed"
);
}
pub fn queueproc_trigger(result: &Result<Request, QueueError>) {
debug!(
msg_id = ?Id::QueueProcTrigger,
kind = %Kind::GotEvent,
?result,
"trigger request result"
);
}
pub fn queueproc_processed_event(result: &Result<MaybeShutdown, QueueError>) {
info!(
msg_id = ?Id::QueueProcProcessedEvent,
kind = %Kind::GotEvent,
?result,
"result of processing event"
);
}
pub fn queueproc_remove_event(event: &QueuedCiEvent) {
debug!(
msg_id = ?Id::QueueProcRemoveEvent,
kind = %Kind::Debug,
?event,
"remove event from queue"
);
}
pub fn queueproc_finished_run(repoid: &Option<RepoId>) {
debug!(
msg_id = ?Id::QueueProcFinishedRun,
kind = %Kind::FinishRun,
?repoid,
"finished run"
);
}
pub fn queueproc_action_run(event: &CiEvent) {
debug!(
msg_id = ?Id::QueueProcActionRun,
kind = %Kind::Debug,
?event,
"Action: run"
);
}
pub fn queueproc_action_shutdown() {
info!(
msg_id = ?Id::QueueProcActionShutdown,
kind = %Kind::Debug,
"Action: shutdown"
);
}
pub fn queueproc_action_terminate(run_id: &RunId) {
info!(
msg_id = ?Id::QueueProcActionTerminate,
kind = %Kind::Debug,
?run_id,
"Action: terminate CI run"
);
}
pub fn queueadd_start() {
info!(
msg_id = ?Id::QueueAddStart,
kind = %Kind::Debug,
"start thread to add events from node to event queue"
);
}
pub fn queueadd_control_socket_close(error: &CiEventSourceError) {
debug!(
msg_id = ?Id::QueueAddEndEvents,
kind = %Kind::Debug,
?error,
"no more events from node control socket"
);
}
pub fn queueadd_push_event(event: &CiEvent, id: &QueueId) {
debug!(
msg_id = ?Id::QueueAddEnqueueEvent,
kind = %Kind::GotEvent,
?event,
?id,
"insert broker event into queue"
);
}
pub fn queueadd_end(result: &Result<(), AdderError>) {
if result.is_err() {
error!(
msg_id = ?Id::QueueAddEnd,
kind = %Kind::Debug,
?result,
"thread to process events ends"
);
} else {
debug!(
msg_id = ?Id::QueueAddEnd,
kind = %Kind::Debug,
?result,
"thread to process events ends"
);
}
}
pub fn pages_directory_unset() {
warn!(
msg_id = ?Id::PagesNoDirSet,
kind = %Kind::Debug,
"not writing HTML report pages as output directory has not been set"
);
}
pub fn pages_directory_does_not_exist(report_dir: &Path) {
warn!(
msg_id = ?Id::PagesDirDoesNotExist,
kind = %Kind::Debug,
?report_dir,
"HTML report directory does not exist"
);
}
pub fn pages_interval(interval: Duration) {
trace!(
msg_id = ?Id::PagesInterval,
kind = %Kind::Debug,
interval = interval.as_secs(),
"interval for waiting between HTML page updates",
);
}
pub fn pages_disconnected() {
debug!(
msg_id = ?Id::PagesDisconnected,
kind = %Kind::Debug,
"page updater: run notification channel disconnected"
);
}
pub fn pages_start() {
info!(
msg_id = ?Id::PagesStart,
kind = %Kind::Debug,
"start page updater thread"
);
}
pub fn pages_end(result: &Result<(), PageError>) {
if result.is_err() {
error!(
msg_id = ?Id::PagesEnd,
kind = %Kind::Debug,
?result,
"end page updater thread"
);
} else {
debug!(
msg_id = ?Id::PagesEnd,
kind = %Kind::Debug,
?result,
"end page updater thread"
);
}
}
pub fn event_disconnected() {
debug!(
msg_id = ?Id::NodeEventSourceDisconnected,
kind = %Kind::Debug,
"connection to node control socket broke"
);
}
pub fn event_end() {
debug!(
msg_id = ?Id::NodeEventSourceEnd,
kind = %Kind::Debug,
"no more node events from control socket: iterator ended"
);
}
pub fn broker_db(filename: &Path) {
debug!(
msg_id = ?Id::BrokerDatabase,
kind = %Kind::Startup,
filename = %filename.display(),
"broker database"
);
}
pub fn broker_start_run(trigger: &Request, broker_run_id: &RunId) {
info!(
msg_id = ?Id::BrokerRunStart,
kind = %Kind::StartRun,
?trigger,
%broker_run_id,
"start CI run"
);
}
pub fn broker_end_run(run: &Run, broker_run_id: &RunId) {
info!(
msg_id = ?Id::BrokerRunEnd,
kind = %Kind::FinishRun,
?run,
%broker_run_id,
"Finish CI run"
);
}
pub fn patch_cob_lookup(repo_id: &RepoId, patch_id: &PatchId) {
warn!(
msg_id = ?Id::TriggerCreate,
kind = %Kind::StartRun,
?repo_id,
?patch_id,
"did not find patch COB in repository"
);
}
pub fn adapter_temp_config(filename: &Path, config: &str) {
debug!(
msg_id = ?Id::AdapterTempConfig,
kind = %Kind::AdapterMessage,
?filename,
?config,
"adapter config embedded in cib config"
);
}
pub fn adapter_stdout_line(line: &str) {
trace!(
msg_id = ?Id::AdapterStdoutLine,
kind = %Kind::AdapterMessage,
?line,
"adapter stdout line"
);
}
pub fn adapter_no_first_response() {
error!(
msg_id = ?Id::AdapterNoFirstMessage,
kind = %Kind::AdapterMessage,
"no first response message"
);
}
pub fn adapter_no_second_response() {
error!(
msg_id = ?Id::AdapterNoSecondMessage,
kind = %Kind::AdapterMessage,
"no second response message"
);
}
pub fn adapter_too_many_responses() {
error!(
msg_id = ?Id::AdapterTooManyMessages,
kind = %Kind::AdapterMessage,
"too many response messages"
);
}
pub fn adapter_stderr_line(line: &str) {
debug!(
msg_id = ?Id::AdapterStderrLine,
kind = %Kind::Debug,
stderr_line = line,
"adapter stderr"
);
}
pub fn adapter_result(exit: ExitStatus) {
debug!(
msg_id = ?Id::AdapterExitCode,
kind = %Kind::Debug,
?exit,
"adapter exit code"
);
}
pub fn adapter_did_not_exit_voluntarily() {
warn!(
msg_id = ?Id::AdapterInvoluntaryExit,
kind = %Kind::Debug,
"adapter did not exit voluntarily: terminated for taking too long"
);
}
pub fn adapter_did_not_exit(error: &TimeoutError) {
warn!(
msg_id = ?Id::AdapterNoExit,
kind = %Kind::Debug,
?error,
"adapter did not exit: probably killed by signal"
);
}
pub fn adapter_timed_out() {
info!(
msg_id = ?Id::AdapterTimedOut,
kind = %Kind::FinishRun,
"adapter was terminated due to taking too long"
);
}
pub fn job_failure(
msg: &'static str,
repo_id: &RepoId,
oid: &Oid,
error: Option<&crate::cob::JobError>,
) {
info!(
msg_id = ?Id::JobFailure,
kind = %Kind::StartRun,
?repo_id,
?oid,
?error,
msg,
"failed to created or update job COB",
);
}
pub fn job_create(repo_id: &RepoId, oid: &Oid, job_id: &JobId) {
debug!(
msg_id = ?Id::JobCreate,
kind = %Kind::StartRun,
?repo_id,
?oid,
?job_id,
"created job COB",
);
}
pub fn job_reuse(repo_id: &RepoId, oid: &Oid, job_id: &JobId) {
debug!(
msg_id = ?Id::JobCreate,
kind = %Kind::StartRun,
?repo_id,
?oid,
?job_id,
"created job COB",
);
}
pub fn job_run_create(job_id: JobId, run_id: Uuid) {
debug!(
msg_id = ?Id::JobRunCreate,
kind = %Kind::StartRun,
?job_id,
?run_id,
"created run for job COB",
);
}
pub fn job_run_finished(job_id: JobId, run_id: Uuid, reason: Reason) {
debug!(
msg_id = ?Id::JobRunFinished,
kind = %Kind::FinishRun,
?job_id,
?run_id,
?reason,
"marked run finished for job COB",
);
}
pub fn event_filter_decision(filter: &'static str, allowed: bool, reason: &str) {
info!(
msg_id = ?Id::FilterDecision,
kind = %Kind::FilterDecision,
allowed,
reason,
"{filter}"
);
}
pub fn error(msg: &str, e: &impl std::error::Error) {
error!("{msg}: {e}");
let mut e = e.source();
while let Some(source) = e {
error!("caused by: {}", source);
e = source.source();
}
}
pub fn worker_start(name: &str) {
debug!(
msg_id = ?Id::WorkerStart,
name,
"worker starts"
);
}
pub fn worker_end(name: &str, result: &Result<(), impl std::error::Error>) {
debug!(
msg_id = ?Id::WorkerEnd,
name,
?result,
"worker ends"
);
}