use anyhow::{Context, Result};
use figment::{Figment, providers::Serialized};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use super::config::{
AppConfig, CliArgs, LoggingConfig, MODKIT_MODULE_CONFIG_ENV, RenderedDbConfig,
RenderedModuleConfig,
};
use crate::bootstrap::host::{init_logging_unified, init_panic_tracing};
use crate::runtime::{
ClientRegistration, DbOptions, MODKIT_DIRECTORY_ENDPOINT_ENV, RunOptions, ShutdownOptions, run,
shutdown,
};
use cf_system_sdks::directory::{DirectoryClient, DirectoryGrpcClient};
#[derive(Debug, Clone)]
pub struct OopRunOptions {
pub module_name: String,
pub instance_id: Option<Uuid>,
pub directory_endpoint: String,
pub config_path: Option<PathBuf>,
pub verbose: u8,
pub print_config: bool,
pub heartbeat_interval_secs: u64,
}
impl Default for OopRunOptions {
fn default() -> Self {
let config_path = std::env::var("MODKIT_CONFIG_PATH").ok().map(PathBuf::from);
let directory_endpoint = std::env::var(MODKIT_DIRECTORY_ENDPOINT_ENV)
.unwrap_or_else(|_| "http://127.0.0.1:50051".to_owned());
Self {
module_name: String::new(),
instance_id: None,
directory_endpoint,
config_path,
verbose: 0,
print_config: false,
heartbeat_interval_secs: 5,
}
}
}
#[tracing::instrument(
level = "debug",
skip(local_config, rendered_config),
fields(
has_rendered = rendered_config.is_some(),
has_local_db = local_config.database.is_some()
)
)]
fn build_oop_config_and_db(
local_config: &AppConfig,
module_name: &str,
rendered_config: Option<&RenderedModuleConfig>,
) -> Result<(AppConfig, LoggingConfig, DbOptions)> {
let home_dir = PathBuf::from(&local_config.server.home_dir);
let final_config = if let Some(rendered) = rendered_config {
let mut config = local_config.clone();
let module_entry = config
.modules
.entry(module_name.to_owned())
.or_insert_with(|| serde_json::json!({}));
if let Some(obj) = module_entry.as_object_mut() {
if !obj.contains_key("config") || obj["config"].is_null() {
obj.insert("config".to_owned(), rendered.config.clone());
}
}
debug!(
module = %module_name,
has_rendered_db = %rendered.database.is_some(),
has_rendered_logging = %rendered.logging.is_some(),
"Using rendered config from master as base, local config as override"
);
config
} else {
debug!(
module = %module_name,
"No rendered config from master, using local config entirely (standalone mode)"
);
local_config.clone()
};
let final_logging = merge_logging_configs(
rendered_config.as_ref().and_then(|r| r.logging.as_ref()),
&local_config.logging,
);
let db_options = build_merged_db_options(
&home_dir,
module_name,
rendered_config.as_ref().and_then(|r| r.database.as_ref()),
local_config,
)?;
Ok((final_config, final_logging, db_options))
}
fn merge_logging_configs(master: Option<&LoggingConfig>, local: &LoggingConfig) -> LoggingConfig {
master
.cloned()
.unwrap_or_default()
.into_iter()
.chain(local.clone())
.collect()
}
fn build_merged_db_options(
home_dir: &Path,
module_name: &str,
rendered_db: Option<&RenderedDbConfig>,
local_config: &AppConfig,
) -> Result<DbOptions> {
let has_rendered_db = rendered_db.is_some_and(|db| db.module.is_some() || db.global.is_some());
let has_local_db = local_config.database.is_some()
|| local_config
.modules
.get(module_name)
.and_then(|m| m.get("database"))
.is_some();
if !has_rendered_db && !has_local_db {
debug!(
module = %module_name,
"No database config available"
);
return Ok(DbOptions::None);
}
let mut merged_config = serde_json::Map::new();
if let Some(rendered) = rendered_db {
if let Some(ref global) = rendered.global {
let global_json = serde_json::to_value(global)
.context("Failed to serialize rendered global db config")?;
merged_config.insert("database".to_owned(), global_json);
}
if let Some(ref module_db) = rendered.module {
let module_db_json = serde_json::to_value(module_db)
.context("Failed to serialize rendered module db config")?;
let mut modules = serde_json::Map::new();
let mut module_entry = serde_json::Map::new();
module_entry.insert("database".to_owned(), module_db_json);
modules.insert(
module_name.to_owned(),
serde_json::Value::Object(module_entry),
);
merged_config.insert("modules".to_owned(), serde_json::Value::Object(modules));
}
}
if let Some(ref local_db) = local_config.database {
let local_db_json =
serde_json::to_value(local_db).context("Failed to serialize local global db config")?;
if let Some(existing) = merged_config.get_mut("database") {
merge_json_objects(existing, &local_db_json);
} else {
merged_config.insert("database".to_owned(), local_db_json);
}
}
if let Some(local_module) = local_config.modules.get(module_name)
&& let Some(local_module_db) = local_module.get("database")
{
let modules = merged_config
.entry("modules".to_owned())
.or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
if let Some(modules_obj) = modules.as_object_mut() {
let module_entry = modules_obj
.entry(module_name.to_owned())
.or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
if let Some(module_obj) = module_entry.as_object_mut() {
if let Some(existing_db) = module_obj.get_mut("database") {
merge_json_objects(existing_db, local_module_db);
} else {
module_obj.insert("database".to_owned(), local_module_db.clone());
}
}
}
}
debug!(
module = %module_name,
has_rendered = %rendered_db.is_some(),
has_local_global = %local_config.database.is_some(),
"Building DbManager with merged config"
);
let figment = Figment::new().merge(Serialized::defaults(serde_json::Value::Object(
merged_config,
)));
let db_manager = Arc::new(
modkit_db::DbManager::from_figment(figment, home_dir.to_path_buf())
.context("Failed to create DbManager from merged config")?,
);
Ok(DbOptions::Manager(db_manager))
}
fn merge_json_objects(target: &mut serde_json::Value, source: &serde_json::Value) {
if let (Some(target_obj), Some(source_obj)) = (target.as_object_mut(), source.as_object()) {
for (key, value) in source_obj {
if let Some(target_value) = target_obj.get_mut(key) {
if target_value.is_object() && value.is_object() {
merge_json_objects(target_value, value);
} else {
*target_value = value.clone();
}
} else {
target_obj.insert(key.clone(), value.clone());
}
}
} else {
*target = source.clone();
}
}
#[tracing::instrument(
level = "info",
name = "oop_bootstrap",
skip(opts),
fields(
module = %opts.module_name,
directory = %opts.directory_endpoint
)
)]
pub async fn run_oop_with_options(opts: OopRunOptions) -> Result<()> {
let instance_id = opts.instance_id.unwrap_or_else(Uuid::new_v4);
let cancel = CancellationToken::new();
let cancel_for_signals = cancel.clone();
tokio::spawn(async move {
match shutdown::wait_for_shutdown().await {
Ok(()) => {
info!(target: "", "------------------");
info!("shutdown: signal received in OoP bootstrap");
}
Err(e) => {
warn!(
error = %e,
"shutdown: primary waiter failed in OoP bootstrap, falling back to ctrl_c()"
);
_ = tokio::signal::ctrl_c().await;
}
}
cancel_for_signals.cancel();
});
let args = CliArgs {
config: opts
.config_path
.as_ref()
.map(|p| p.to_string_lossy().to_string()),
print_config: opts.print_config,
verbose: opts.verbose,
mock: false,
};
let mut config = AppConfig::load_or_default(opts.config_path.as_ref())?;
config.apply_cli_overrides(args.verbose);
let rendered_config = match std::env::var(MODKIT_MODULE_CONFIG_ENV) {
Ok(json) => RenderedModuleConfig::from_json(&json).ok(),
Err(_) => None,
};
let (final_config, merged_logging, db_options) =
build_oop_config_and_db(&config, &opts.module_name, rendered_config.as_ref())?;
#[cfg(feature = "otel")]
let otel_cfg = rendered_config
.as_ref()
.and_then(|rc| rc.opentelemetry.as_ref());
#[cfg(feature = "otel")]
let otel_layer = otel_cfg
.filter(|cfg| cfg.tracing.enabled)
.map(crate::telemetry::init_tracing)
.transpose()?;
#[cfg(not(feature = "otel"))]
let otel_layer = None;
#[cfg(feature = "otel")]
let metrics_init_error = otel_cfg
.filter(|cfg| cfg.metrics.enabled)
.and_then(|cfg| crate::telemetry::init::init_metrics_provider(cfg).err());
init_logging_unified(&merged_logging, &config.server.home_dir, otel_layer);
#[cfg(feature = "otel")]
if let Some(e) = metrics_init_error {
tracing::error!(error = %e, "OpenTelemetry metrics not initialized (OoP)");
}
init_panic_tracing();
if let Some(ref rc) = rendered_config {
info!(
env_var = MODKIT_MODULE_CONFIG_ENV,
has_database = rc.database.is_some(),
has_config = !rc.config.is_null(),
has_logging = rc.logging.is_some(),
has_opentelemetry = rc.opentelemetry.is_some(),
"Received rendered config from master host"
);
} else if std::env::var(MODKIT_MODULE_CONFIG_ENV).is_ok() {
warn!(
env_var = MODKIT_MODULE_CONFIG_ENV,
"Failed to parse rendered config from master host, using local config only"
);
} else {
debug!(
env_var = MODKIT_MODULE_CONFIG_ENV,
"No rendered config from master host, using local config only"
);
}
info!(
module = %opts.module_name,
instance_id = %instance_id,
directory_endpoint = %opts.directory_endpoint,
"OoP module bootstrap starting"
);
if opts.print_config {
print_config(&config);
return Ok(());
}
info!(
"Connecting to directory service at {}",
opts.directory_endpoint
);
let directory_client = DirectoryGrpcClient::connect(&opts.directory_endpoint).await?;
let directory_api: Arc<dyn DirectoryClient> = Arc::new(directory_client);
info!("Successfully connected to directory service");
let heartbeat_directory = Arc::clone(&directory_api);
let heartbeat_module = opts.module_name.clone();
let heartbeat_instance_id_str = instance_id.to_string();
let heartbeat_interval = Duration::from_secs(opts.heartbeat_interval_secs);
let heartbeat_cancel = cancel.child_token();
tokio::spawn(async move {
info!(
interval_secs = opts.heartbeat_interval_secs,
"Starting heartbeat loop"
);
loop {
tokio::select! {
() = heartbeat_cancel.cancelled() => {
info!("Heartbeat loop stopping due to cancellation");
break;
}
() = sleep(heartbeat_interval) => {
match heartbeat_directory
.send_heartbeat(&heartbeat_module, &heartbeat_instance_id_str)
.await
{
Ok(()) => {
tracing::debug!("Heartbeat sent successfully");
}
Err(e) => {
warn!(error = %e, "Failed to send heartbeat, will retry");
}
}
}
}
}
});
let config_provider = Arc::new(final_config);
info!("Starting module lifecycle");
let run_options = RunOptions {
modules_cfg: config_provider,
db: db_options,
shutdown: ShutdownOptions::Token(cancel.clone()),
clients: vec![ClientRegistration::new::<dyn DirectoryClient>(
directory_api,
)],
instance_id,
oop: None, shutdown_deadline: None,
};
let result = run(run_options).await;
if let Err(ref e) = result {
error!(error = %e, "Module runtime failed");
} else {
info!("Module runtime completed successfully");
}
result
}
#[allow(unknown_lints, de1301_no_print_macros)] fn print_config(config: &AppConfig) {
match config.to_yaml() {
Ok(yaml) => {
println!("{yaml}");
}
Err(e) => {
eprintln!("Failed to render config as YAML: {e}");
}
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[path = "oop_tests.rs"]
mod tests;