use clap::Args;
use figment::{
providers::{Env, Format, Serialized, Toml},
value::magic::RelativePathBuf,
Figment,
};
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::ops::Not;
use std::{collections::HashSet, time::Duration};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{fmt::MakeWriter, layer::SubscriberExt, util::SubscriberInitExt, Layer};
use url::Url;
use crate::error::Error;
use super::{coordinator::DEFAULT_COORDINATOR_ADDR, TracingGuard};
struct WorkerIdWriter<W: Write> {
inner: W,
prefix: Vec<u8>,
at_line_start: bool,
buffer: Vec<u8>,
}
impl<W: Write> Drop for WorkerIdWriter<W> {
fn drop(&mut self) {
let _ = self.flush_buffer();
}
}
impl<W: Write> WorkerIdWriter<W> {
fn new(inner: W, worker_id: String) -> Self {
let prefix = format!("[worker:{}] ", worker_id).into_bytes();
Self {
inner,
prefix,
at_line_start: true,
buffer: Vec::with_capacity(8192), }
}
fn flush_buffer(&mut self) -> std::io::Result<()> {
if !self.buffer.is_empty() {
self.inner.write_all(&self.buffer)?;
self.buffer.clear();
}
Ok(())
}
}
impl<W: Write> Write for WorkerIdWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut remaining = buf;
let total_len = buf.len();
while !remaining.is_empty() {
if let Some(newline_pos) = remaining.iter().position(|&b| b == b'\n') {
if self.at_line_start && newline_pos > 0 {
self.buffer.extend_from_slice(&self.prefix);
self.at_line_start = false;
}
self.buffer.extend_from_slice(&remaining[..=newline_pos]);
self.at_line_start = true;
if self.buffer.len() > 4096 {
self.flush_buffer()?;
}
remaining = &remaining[newline_pos + 1..];
} else {
if self.at_line_start && !remaining.is_empty() {
self.buffer.extend_from_slice(&self.prefix);
self.at_line_start = false;
}
self.buffer.extend_from_slice(remaining);
break;
}
}
Ok(total_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush_buffer()?;
self.inner.flush()
}
}
struct WorkerIdMakeWriter<M> {
inner: M,
worker_id: String,
}
impl<'a, M> MakeWriter<'a> for WorkerIdMakeWriter<M>
where
M: MakeWriter<'a>,
{
type Writer = WorkerIdWriter<M::Writer>;
fn make_writer(&'a self) -> Self::Writer {
WorkerIdWriter::new(self.inner.make_writer(), self.worker_id.clone())
}
}
#[derive(Deserialize, Serialize, Debug)]
pub struct WorkerConfig {
pub(crate) coordinator_addr: Url,
#[serde(with = "humantime_serde")]
pub(crate) polling_interval: Duration,
#[serde(with = "humantime_serde")]
pub(crate) heartbeat_interval: Duration,
pub(crate) credential_path: Option<RelativePathBuf>,
pub(crate) user: Option<String>,
pub(crate) password: Option<String>,
pub(crate) groups: HashSet<String>,
pub(crate) tags: HashSet<String>,
pub(crate) labels: HashSet<String>,
pub(crate) log_path: Option<RelativePathBuf>,
pub(crate) file_log: bool,
#[serde(default)]
pub(crate) shared_log: bool,
#[serde(with = "humantime_serde")]
pub(crate) lifetime: Option<Duration>,
#[serde(default)]
pub(crate) retain: bool,
#[serde(default)]
pub(crate) skip_redis: bool,
}
#[derive(Args, Debug, Serialize, Default, Clone)]
#[command(rename_all = "kebab-case")]
pub struct WorkerConfigCli {
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub config: Option<String>,
#[arg(short, long = "coordinator")]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub coordinator_addr: Option<String>,
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub polling_interval: Option<String>,
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub heartbeat_interval: Option<String>,
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub credential_path: Option<String>,
#[arg(short, long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub user: Option<String>,
#[arg(short, long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub password: Option<String>,
#[arg(short, long, num_args = 0.., value_delimiter = ',')]
#[serde(skip_serializing_if = "::std::vec::Vec::is_empty")]
pub groups: Vec<String>,
#[arg(short, long, num_args = 0.., value_delimiter = ',')]
#[serde(skip_serializing_if = "::std::vec::Vec::is_empty")]
pub tags: Vec<String>,
#[arg(short, long, num_args = 0.., value_delimiter = ',')]
#[serde(skip_serializing_if = "::std::vec::Vec::is_empty")]
pub labels: Vec<String>,
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub log_path: Option<String>,
#[arg(long)]
#[serde(skip_serializing_if = "<&bool>::not")]
pub file_log: bool,
#[arg(long)]
#[serde(skip_serializing_if = "<&bool>::not")]
pub shared_log: bool,
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub lifetime: Option<String>,
#[arg(long)]
#[serde(skip_serializing_if = "<&bool>::not")]
pub retain: bool,
#[arg(long)]
#[serde(skip_serializing_if = "<&bool>::not")]
pub skip_redis: bool,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
coordinator_addr: Url::parse(&format!("http://{DEFAULT_COORDINATOR_ADDR}")).unwrap(),
polling_interval: Duration::from_secs(180),
heartbeat_interval: Duration::from_secs(300),
credential_path: None,
user: None,
password: None,
groups: HashSet::new(),
tags: HashSet::new(),
labels: HashSet::new(),
log_path: None,
file_log: false,
shared_log: false,
lifetime: None,
retain: false,
skip_redis: false,
}
}
}
impl WorkerConfig {
pub fn new(cli: &WorkerConfigCli) -> crate::error::Result<Self> {
let global_config = dirs::config_dir().map(|mut p| {
p.push("mitosis");
p.push("config.toml");
p
});
let mut figment = Figment::new().merge(Serialized::from(Self::default(), "worker"));
if let Some(global_config) = global_config {
if global_config.exists() {
figment = figment.merge(Toml::file(global_config).nested());
}
}
figment = figment
.merge(Toml::file(cli.config.as_deref().unwrap_or("config.toml")).nested())
.merge(Env::prefixed("MITO_").profile("worker"))
.merge(Serialized::from(cli, "worker"))
.select("worker");
Ok(figment.extract()?)
}
pub fn setup_tracing_subscriber<T, U>(&self, worker_id: U) -> crate::error::Result<TracingGuard>
where
T: std::fmt::Display,
U: Into<T>,
{
if self.file_log {
let id = worker_id.into();
let id_str = id.to_string();
let file_logger = if self.shared_log {
self.log_path
.as_ref()
.and_then(|p| {
let path = p.relative();
let dir = path.parent();
let file_name = path.file_name();
match (dir, file_name) {
(Some(dir), Some(file_name)) => {
RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.filename_prefix(file_name.to_string_lossy().to_string())
.max_log_files(3)
.build(dir)
.ok()
}
_ => None,
}
})
.or_else(|| {
dirs::cache_dir()
.map(|mut p| {
p.push("mitosis");
p.push("worker");
p
})
.and_then(|dir| {
RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.filename_prefix("workers.log")
.max_log_files(3)
.build(dir)
.ok()
})
})
.ok_or(Error::ConfigError(Box::new(figment::Error::from(
"log path not valid and cache directory not found",
))))?
} else {
self.log_path
.as_ref()
.and_then(|p| {
let path = p.relative();
let dir = path.parent();
let file_name = path.file_name();
match (dir, file_name) {
(Some(dir), Some(file_name)) => {
Some(tracing_appender::rolling::never(dir, file_name))
}
_ => None,
}
})
.or_else(|| {
dirs::cache_dir()
.map(|mut p| {
p.push("mitosis");
p.push("worker");
p
})
.map(|dir| {
tracing_appender::rolling::never(dir, format!("{id_str}.log"))
})
})
.ok_or(Error::ConfigError(Box::new(figment::Error::from(
"log path not valid and cache directory not found",
))))?
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_logger);
let env_filter = tracing_subscriber::EnvFilter::try_from_env("MITO_FILE_LOG_LEVEL")
.unwrap_or_else(|_| "netmito=info".into());
let coordinator_guard = if self.shared_log {
let worker_writer = WorkerIdMakeWriter {
inner: non_blocking,
worker_id: id_str,
};
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer().with_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "netmito=info".into()),
),
)
.with(
tracing_subscriber::fmt::layer()
.with_writer(worker_writer)
.with_filter(env_filter),
)
.set_default()
} else {
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer().with_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "netmito=info".into()),
),
)
.with(
tracing_subscriber::fmt::layer()
.with_writer(non_blocking)
.with_filter(env_filter),
)
.set_default()
};
Ok(TracingGuard {
subscriber_guard: Some(coordinator_guard),
file_guard: Some(guard),
})
} else {
let coordinator_guard = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer().with_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "netmito=info".into()),
),
)
.set_default();
Ok(TracingGuard {
subscriber_guard: Some(coordinator_guard),
file_guard: None,
})
}
}
}