#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use modkit::{
ModuleCtx,
config::ConfigProvider,
contracts::{
DatabaseCapability, Module, OpenApiRegistry, RestApiCapability, RunnableCapability,
},
registry::{ModuleRegistry, RegistryBuilder},
runtime::{DbOptions, RunOptions, ShutdownOptions, run},
};
#[allow(dead_code)]
type CallTracker = Arc<Mutex<Vec<String>>>;
#[derive(Default)]
#[allow(dead_code)]
struct TestOpenApiRegistry;
impl OpenApiRegistry for TestOpenApiRegistry {
fn register_operation(&self, _spec: &modkit::api::OperationSpec) {}
fn ensure_schema_raw(
&self,
root_name: &str,
_schemas: Vec<(
String,
utoipa::openapi::RefOr<utoipa::openapi::schema::Schema>,
)>,
) -> String {
root_name.to_owned()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[derive(Clone)]
struct MockConfigProvider {
configs: std::collections::HashMap<String, serde_json::Value>,
}
impl MockConfigProvider {
fn new() -> Self {
Self {
configs: std::collections::HashMap::new(),
}
}
fn with_config(mut self, module_name: &str, config: serde_json::Value) -> Self {
self.configs.insert(module_name.to_owned(), config);
self
}
}
impl ConfigProvider for MockConfigProvider {
fn get_module_config(&self, module_name: &str) -> Option<&serde_json::Value> {
self.configs.get(module_name)
}
}
#[allow(dead_code)]
trait Pipe<T> {
fn pipe<U, F: FnOnce(T) -> U>(self, f: F) -> U;
}
impl<T> Pipe<T> for T {
fn pipe<U, F: FnOnce(T) -> U>(self, f: F) -> U {
f(self)
}
}
#[allow(dead_code)]
#[derive(Clone)]
struct TestModule {
name: String,
calls: CallTracker,
should_fail_init: Arc<AtomicBool>,
should_fail_db: Arc<AtomicBool>,
should_fail_rest: Arc<AtomicBool>,
should_fail_start: Arc<AtomicBool>,
should_fail_stop: Arc<AtomicBool>,
}
#[allow(dead_code)]
impl TestModule {
fn new(name: &str, calls: CallTracker) -> Self {
Self {
name: name.to_owned(),
calls,
should_fail_init: Arc::new(AtomicBool::new(false)),
should_fail_db: Arc::new(AtomicBool::new(false)),
should_fail_rest: Arc::new(AtomicBool::new(false)),
should_fail_start: Arc::new(AtomicBool::new(false)),
should_fail_stop: Arc::new(AtomicBool::new(false)),
}
}
fn fail_init(self) -> Self {
self.should_fail_init.store(true, Ordering::SeqCst);
self
}
fn fail_db(self) -> Self {
self.should_fail_db.store(true, Ordering::SeqCst);
self
}
fn fail_rest(self) -> Self {
self.should_fail_rest.store(true, Ordering::SeqCst);
self
}
fn fail_start(self) -> Self {
self.should_fail_start.store(true, Ordering::SeqCst);
self
}
fn fail_stop(self) -> Self {
self.should_fail_stop.store(true, Ordering::SeqCst);
self
}
}
#[async_trait::async_trait]
impl Module for TestModule {
async fn init(&self, _ctx: &ModuleCtx) -> anyhow::Result<()> {
self.calls
.lock()
.unwrap()
.push(format!("{}.init", self.name));
if self.should_fail_init.load(Ordering::SeqCst) {
anyhow::bail!("Init failed for module {}", self.name);
}
Ok(())
}
}
impl DatabaseCapability for TestModule {
fn migrations(&self) -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
self.calls
.lock()
.unwrap()
.push(format!("{}.migrations", self.name));
if self.should_fail_db.load(Ordering::SeqCst) {
vec![Box::new(FailingMigration)]
} else {
vec![]
}
}
}
struct FailingMigration;
impl sea_orm_migration::MigrationName for FailingMigration {
#[allow(clippy::unnecessary_literal_bound)]
fn name(&self) -> &str {
"m000_fail"
}
}
#[async_trait::async_trait]
impl sea_orm_migration::MigrationTrait for FailingMigration {
async fn up(
&self,
_manager: &sea_orm_migration::SchemaManager,
) -> Result<(), sea_orm_migration::sea_orm::DbErr> {
Err(sea_orm_migration::sea_orm::DbErr::Custom(
"intentional migration failure".to_owned(),
))
}
async fn down(
&self,
_manager: &sea_orm_migration::SchemaManager,
) -> Result<(), sea_orm_migration::sea_orm::DbErr> {
Ok(())
}
}
impl RestApiCapability for TestModule {
fn register_rest(
&self,
_ctx: &ModuleCtx,
router: axum::Router,
_openapi: &dyn OpenApiRegistry,
) -> anyhow::Result<axum::Router> {
self.calls
.lock()
.unwrap()
.push(format!("{}.register_rest", self.name));
if self.should_fail_rest.load(Ordering::SeqCst) {
anyhow::bail!("REST registration failed for module {}", self.name);
}
Ok(router)
}
}
#[async_trait::async_trait]
impl RunnableCapability for TestModule {
async fn start(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
self.calls
.lock()
.unwrap()
.push(format!("{}.start", self.name));
if self.should_fail_start.load(Ordering::SeqCst) {
anyhow::bail!("Start failed for module {}", self.name);
}
Ok(())
}
async fn stop(&self, _cancel: CancellationToken) -> anyhow::Result<()> {
self.calls
.lock()
.unwrap()
.push(format!("{}.stop", self.name));
if self.should_fail_stop.load(Ordering::SeqCst) {
anyhow::bail!("Stop failed for module {}", self.name);
}
Ok(())
}
}
#[allow(dead_code)]
fn create_test_registry(modules: Vec<TestModule>) -> anyhow::Result<ModuleRegistry> {
let mut builder = RegistryBuilder::default();
for module in modules {
let module_name = module.name.clone();
let module_name_str: &'static str = Box::leak(module_name.into_boxed_str());
let module = Arc::new(module);
builder.register_core_with_meta(module_name_str, &[], module.clone() as Arc<dyn Module>);
builder.register_db_with_meta(
module_name_str,
module.clone() as Arc<dyn DatabaseCapability>,
);
builder.register_rest_with_meta(
module_name_str,
module.clone() as Arc<dyn RestApiCapability>,
);
builder.register_stateful_with_meta(
module_name_str,
module.clone() as Arc<dyn RunnableCapability>,
);
}
Ok(builder.build_topo_sorted()?)
}
fn create_mock_db_manager() -> Arc<modkit_db::DbManager> {
use figment::{Figment, providers::Serialized};
let figment = Figment::new().merge(Serialized::defaults(serde_json::json!({
"test_module": {
"database": {
"dsn": "sqlite::memory:",
"params": {
"journal_mode": "WAL"
}
}
}
})));
let home_dir = std::path::PathBuf::from("/tmp/test");
Arc::new(modkit_db::DbManager::from_figment(figment, home_dir).unwrap())
}
#[tokio::test]
async fn test_db_phase_failure_stops_lifecycle() {
use modkit::runtime::HostRuntime;
let calls: CallTracker = Arc::new(Mutex::new(Vec::new()));
let failing = TestModule::new("fail_db", calls.clone()).fail_db();
let registry = create_test_registry(vec![failing]).expect("registry build");
let db_manager = {
use figment::{Figment, providers::Serialized};
let figment = Figment::new().merge(Serialized::defaults(serde_json::json!({
"modules": {
"fail_db": {
"database": {
"dsn": "sqlite::memory:",
"pool": { "max_conns": 1 }
}
}
}
})));
let home_dir = std::path::PathBuf::from("/tmp/test");
Arc::new(modkit_db::DbManager::from_figment(figment, home_dir).unwrap())
};
let cancel = CancellationToken::new();
let hr = HostRuntime::new(
registry,
Arc::new(MockConfigProvider::new().with_config(
"fail_db",
serde_json::json!({
"database": { "dsn": "sqlite::memory:", "pool": { "max_conns": 1 } },
"config": {}
}),
)),
DbOptions::Manager(db_manager),
Arc::new(modkit::client_hub::ClientHub::default()),
cancel,
Uuid::new_v4(),
None,
);
let err = hr.run_module_phases().await.unwrap_err();
let chain = format!("{err:#}");
assert!(
chain.contains("intentional migration failure"),
"expected migration failure in error chain, got: {chain}"
);
let events = calls.lock().unwrap().clone();
assert!(events.contains(&"fail_db.migrations".to_owned()));
assert!(
!events.iter().any(|e| {
std::path::Path::new(e)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("init"))
}),
"init must not run after db phase failure: {events:?}"
);
assert!(
!events.iter().any(|e| e.ends_with(".register_rest")),
"rest must not run after db phase failure: {events:?}"
);
assert!(
!events.iter().any(|e| {
std::path::Path::new(e)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("start"))
}),
"start must not run after db phase failure: {events:?}"
);
}
#[tokio::test]
async fn test_db_options_none() {
let cancel = CancellationToken::new();
cancel.cancel();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let result = timeout(Duration::from_millis(100), run(opts)).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_db_options_manager() {
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
cancel_clone.cancel();
});
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new().with_config(
"test_module",
serde_json::json!({
"database": {
"dsn": "sqlite::memory:"
},
"config": {}
}),
)),
db: DbOptions::Manager(create_mock_db_manager()),
shutdown: ShutdownOptions::Token(cancel),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let result = timeout(Duration::from_secs(1), run(opts)).await;
assert!(result.is_ok());
let run_result = result.unwrap();
assert!(run_result.is_ok());
}
#[tokio::test]
async fn test_shutdown_options_token() {
let cancel = CancellationToken::new();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel.clone()),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let runner_handle = tokio::spawn(run(opts));
tokio::time::sleep(Duration::from_millis(10)).await;
cancel.cancel();
let result = timeout(Duration::from_millis(100), runner_handle).await;
assert!(result.is_ok());
let run_result = result.unwrap().unwrap();
assert!(run_result.is_ok());
}
#[tokio::test]
async fn test_shutdown_options_future() {
let (tx, rx) = tokio::sync::oneshot::channel();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Future(Box::pin(async move {
_ = rx.await;
})),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let runner_handle = tokio::spawn(run(opts));
tokio::time::sleep(Duration::from_millis(10)).await;
_ = tx.send(());
let result = timeout(Duration::from_millis(100), runner_handle).await;
assert!(result.is_ok());
let run_result = result.unwrap().unwrap();
assert!(run_result.is_ok());
}
#[tokio::test]
async fn test_runner_with_config_provider() {
let cancel = CancellationToken::new();
cancel.cancel();
let config_provider = MockConfigProvider::new().with_config(
"test_module",
serde_json::json!({
"setting1": "value1",
"setting2": 42
}),
);
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(config_provider),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let result = timeout(Duration::from_millis(100), run(opts)).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_complete_lifecycle_success() {
let cancel = CancellationToken::new();
cancel.cancel();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let result = run(opts).await;
assert!(result.is_ok());
}
#[test]
fn test_run_options_construction() {
let cancel = CancellationToken::new();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
match opts.db {
DbOptions::None => {}
DbOptions::Manager(_) => panic!("Expected DbOptions::None"),
}
match opts.shutdown {
ShutdownOptions::Token(_) => {}
_ => panic!("Expected ShutdownOptions::Token"),
}
}
#[tokio::test]
async fn test_cancellation_during_startup() {
let cancel = CancellationToken::new();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel.clone()),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let runner_handle = tokio::spawn(run(opts));
cancel.cancel();
let result = timeout(Duration::from_millis(100), runner_handle).await;
assert!(
result.is_ok(),
"Runner should complete quickly when cancelled"
);
let run_result = result.unwrap().unwrap();
assert!(
run_result.is_ok(),
"Runner should handle cancellation gracefully"
);
}
#[tokio::test]
async fn test_multiple_config_provider_scenarios() {
let cancel = CancellationToken::new();
cancel.cancel();
let empty_config = MockConfigProvider::new();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(empty_config),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel.clone()),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let result = run(opts).await;
assert!(result.is_ok(), "Should handle empty config");
let complex_config = MockConfigProvider::new()
.with_config(
"module1",
serde_json::json!({
"setting1": "value1",
"nested": {
"setting2": 42,
"setting3": true
}
}),
)
.with_config(
"module2",
serde_json::json!({
"array_setting": [1, 2, 3],
"string_setting": "test"
}),
);
let cancel2 = CancellationToken::new();
cancel2.cancel();
let opts2 = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(complex_config),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel2),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let result2 = run(opts2).await;
assert!(result2.is_ok(), "Should handle complex config");
}
#[tokio::test]
async fn test_runner_timeout_scenarios() {
let cancel = CancellationToken::new();
let opts = RunOptions {
instance_id: Uuid::new_v4(),
modules_cfg: Arc::new(MockConfigProvider::new()),
db: DbOptions::None,
shutdown: ShutdownOptions::Token(cancel.clone()),
clients: vec![],
oop: None,
shutdown_deadline: None,
};
let runner_handle = tokio::spawn(run(opts));
tokio::time::sleep(Duration::from_millis(10)).await;
cancel.cancel();
let result = timeout(Duration::from_millis(200), runner_handle).await;
assert!(result.is_ok(), "Runner should complete within timeout");
let run_result = result.unwrap().unwrap();
assert!(run_result.is_ok(), "Runner should complete successfully");
}
#[test]
fn test_config_provider_edge_cases() {
let provider = MockConfigProvider::new()
.with_config("test", serde_json::json!(null))
.with_config("empty", serde_json::json!({}))
.with_config(
"complex",
serde_json::json!({
"a": {
"b": {
"c": "deep_value"
}
}
}),
);
let null_config = provider.get_module_config("test");
assert!(null_config.is_some());
assert!(null_config.unwrap().is_null());
let empty_config = provider.get_module_config("empty");
assert!(empty_config.is_some());
assert!(empty_config.unwrap().is_object());
let complex_config = provider.get_module_config("complex");
assert!(complex_config.is_some());
assert!(complex_config.unwrap()["a"]["b"]["c"] == "deep_value");
let missing_config = provider.get_module_config("nonexistent");
assert!(missing_config.is_none());
}