use std::sync::{Arc, Mutex, OnceLock};
use async_trait::async_trait;
use authn_resolver_sdk::{AuthNResolverClient, ClientCredentialsRequest};
use authz_resolver_sdk::AuthZResolverClient;
use mini_chat_sdk::{MiniChatAuditPluginSpecV1, MiniChatModelPolicyPluginSpecV1};
use modkit::api::OpenApiRegistry;
use modkit::contracts::RunnableCapability;
use modkit::{DatabaseCapability, Module, ModuleCtx, RestApiCapability};
use std::time::Duration;
use modkit_db::outbox::{LeaseConfig, Outbox, OutboxHandle, Partitions};
use oagw_sdk::ServiceGatewayClientV1;
use sea_orm_migration::MigrationTrait;
use tokio_util::sync::CancellationToken;
use tracing::info;
use types_registry_sdk::{RegisterResult, TypesRegistryClient};
use crate::api::rest::routes;
use crate::background_workers::{self, WORKER_STOP_TIMEOUT, WorkerConfigs};
use crate::config::ProviderEntry;
use crate::domain::ports::MiniChatMetricsPort;
use crate::domain::service::{AppServices as GenericAppServices, Repositories};
use crate::infra::metrics::MiniChatMetricsMeter;
use crate::infra::outbox::{AuditEventHandler, InfraOutboxEnqueuer, UsageEventHandler};
use crate::infra::workers::WorkerHandles;
pub(crate) type AppServices = GenericAppServices<
TurnRepository,
MessageRepository,
QuotaUsageRepository,
ReactionRepository,
ChatRepository,
ThreadSummaryRepository,
AttachmentRepository,
VectorStoreRepository,
MessageAttachmentRepository,
>;
use crate::infra::audit_gateway::AuditGateway;
use crate::infra::db::repo::attachment_repo::AttachmentRepository;
use crate::infra::db::repo::chat_repo::ChatRepository;
use crate::infra::db::repo::message_attachment_repo::MessageAttachmentRepository;
use crate::infra::db::repo::message_repo::MessageRepository;
use crate::infra::db::repo::quota_usage_repo::QuotaUsageRepository;
use crate::infra::db::repo::reaction_repo::ReactionRepository;
use crate::infra::db::repo::thread_summary_repo::ThreadSummaryRepository;
use crate::infra::db::repo::turn_repo::TurnRepository;
use crate::infra::db::repo::vector_store_repo::VectorStoreRepository;
use crate::infra::llm::provider_resolver::ProviderResolver;
use crate::infra::model_policy::ModelPolicyGateway;
pub const DEFAULT_URL_PREFIX: &str = "/mini-chat";
#[modkit::module(
name = "mini-chat",
deps = ["types-registry", "authn-resolver", "authz-resolver", "oagw"],
capabilities = [db, rest, stateful],
)]
pub struct MiniChatModule {
service: OnceLock<Arc<AppServices>>,
url_prefix: OnceLock<String>,
outbox_handle: Mutex<Option<OutboxHandle>>,
oagw_deferred: OnceLock<OagwDeferred>,
worker_configs: OnceLock<WorkerConfigs>,
worker_cancel: Mutex<Option<CancellationToken>>,
worker_handles: Mutex<Option<WorkerHandles>>,
outbox_deferred: OnceLock<OutboxDeferred>,
}
struct OagwDeferred {
gateway: Arc<dyn ServiceGatewayClientV1>,
authn: Arc<dyn AuthNResolverClient>,
client_credentials: crate::config::ClientCredentialsConfig,
providers: std::collections::HashMap<String, ProviderEntry>,
}
struct OutboxDeferred {
db: Arc<crate::domain::service::DbProvider>,
outbox_config: crate::config::OutboxConfig,
cleanup_config: crate::config::background::CleanupWorkerConfig,
model_policy_gw: Arc<ModelPolicyGateway>,
audit_gateway: Arc<AuditGateway>,
file_storage: Arc<dyn crate::domain::ports::FileStorageProvider>,
vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider>,
metrics: Arc<dyn MiniChatMetricsPort>,
enqueuer: Arc<InfraOutboxEnqueuer>,
provider_resolver: Arc<crate::infra::llm::provider_resolver::ProviderResolver>,
model_resolver: Arc<dyn crate::domain::repos::ModelResolver>,
thread_summary_config: crate::config::background::ThreadSummaryWorkerConfig,
}
impl Default for MiniChatModule {
fn default() -> Self {
Self {
service: OnceLock::new(),
url_prefix: OnceLock::new(),
outbox_handle: Mutex::new(None),
oagw_deferred: OnceLock::new(),
worker_configs: OnceLock::new(),
worker_cancel: Mutex::new(None),
worker_handles: Mutex::new(None),
outbox_deferred: OnceLock::new(),
}
}
}
#[allow(clippy::too_many_lines)]
#[async_trait]
impl Module for MiniChatModule {
async fn init(&self, ctx: &ModuleCtx) -> anyhow::Result<()> {
info!("Initializing {} module", Self::MODULE_NAME);
let mut cfg: crate::config::MiniChatConfig = ctx.config_expanded_or_default()?;
cfg.streaming
.validate()
.map_err(|e| anyhow::anyhow!("streaming config: {e}"))?;
cfg.estimation_budgets
.validate()
.map_err(|e| anyhow::anyhow!("estimation_budgets config: {e}"))?;
cfg.quota
.validate()
.map_err(|e| anyhow::anyhow!("quota config: {e}"))?;
cfg.outbox
.validate()
.map_err(|e| anyhow::anyhow!("outbox config: {e}"))?;
cfg.context
.validate()
.map_err(|e| anyhow::anyhow!("context config: {e}"))?;
cfg.client_credentials
.validate()
.map_err(|e| anyhow::anyhow!("client_credentials config: {e}"))?;
for (id, entry) in &cfg.providers {
entry
.validate(id)
.map_err(|e| anyhow::anyhow!("providers config: {e}"))?;
}
cfg.orphan_watchdog
.validate()
.map_err(|e| anyhow::anyhow!("orphan_watchdog config: {e}"))?;
cfg.thread_summary_worker
.validate()
.map_err(|e| anyhow::anyhow!("thread_summary_worker config: {e}"))?;
cfg.cleanup_worker
.validate()
.map_err(|e| anyhow::anyhow!("cleanup_worker config: {e}"))?;
cfg.thumbnail
.validate()
.map_err(|e| anyhow::anyhow!("thumbnail config: {e}"))?;
cfg.rag
.validate()
.map_err(|e| anyhow::anyhow!("rag config: {e}"))?;
let vendor = cfg.vendor.trim().to_owned();
if vendor.is_empty() {
return Err(anyhow::anyhow!(
"{}: vendor must be a non-empty string",
Self::MODULE_NAME
));
}
let registry = ctx.client_hub().get::<dyn TypesRegistryClient>()?;
register_plugin_schemas(
&*registry,
&[
(
MiniChatModelPolicyPluginSpecV1::gts_schema_with_refs_as_string(),
MiniChatModelPolicyPluginSpecV1::gts_schema_id(),
"model-policy",
),
(
MiniChatAuditPluginSpecV1::gts_schema_with_refs_as_string(),
MiniChatAuditPluginSpecV1::gts_schema_id(),
"audit",
),
],
)
.await?;
self.url_prefix
.set(cfg.url_prefix)
.map_err(|_| anyhow::anyhow!("{} url_prefix already set", Self::MODULE_NAME))?;
let db_provider = ctx.db_required()?;
let db = Arc::new(db_provider);
let model_policy_gw = Arc::new(ModelPolicyGateway::new(ctx.client_hub(), vendor.clone()));
let audit_gateway = Arc::new(AuditGateway::new(ctx.client_hub(), vendor));
let authz = ctx
.client_hub()
.get::<dyn AuthZResolverClient>()
.map_err(|e| anyhow::anyhow!("failed to get AuthZ resolver: {e}"))?;
let authn_client = ctx
.client_hub()
.get::<dyn AuthNResolverClient>()
.map_err(|e| anyhow::anyhow!("failed to get AuthN resolver: {e}"))?;
let gateway = ctx
.client_hub()
.get::<dyn ServiceGatewayClientV1>()
.map_err(|e| anyhow::anyhow!("failed to get OAGW gateway: {e}"))?;
for entry in cfg.providers.values_mut() {
if entry.upstream_alias.is_none() {
entry.upstream_alias = Some(entry.host.clone());
}
for ovr in entry.tenant_overrides.values_mut() {
if ovr.upstream_alias.is_none()
&& let Some(ref h) = ovr.host
{
ovr.upstream_alias = Some(h.clone());
}
}
}
drop(self.oagw_deferred.set(OagwDeferred {
gateway: Arc::clone(&gateway),
authn: Arc::clone(&authn_client),
client_credentials: cfg.client_credentials.clone(),
providers: cfg.providers.clone(),
}));
let provider_resolver = Arc::new(ProviderResolver::new(&gateway, cfg.providers));
let repos = Repositories {
chat: Arc::new(ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
})),
attachment: Arc::new(AttachmentRepository),
message: Arc::new(MessageRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
})),
quota: Arc::new(QuotaUsageRepository),
turn: Arc::new(TurnRepository),
reaction: Arc::new(ReactionRepository),
thread_summary: Arc::new(ThreadSummaryRepository),
vector_store: Arc::new(VectorStoreRepository),
message_attachment: Arc::new(MessageAttachmentRepository),
};
let rag_client = Arc::new(
crate::infra::llm::providers::rag_http_client::RagHttpClient::new(Arc::clone(&gateway)),
);
let mut file_impls: std::collections::HashMap<
String,
Arc<dyn crate::domain::ports::FileStorageProvider>,
> = std::collections::HashMap::new();
let mut vs_impls: std::collections::HashMap<
String,
Arc<dyn crate::domain::ports::VectorStoreProvider>,
> = std::collections::HashMap::new();
for (provider_id, entry) in provider_resolver.entries() {
let (file, vs): (
Arc<dyn crate::domain::ports::FileStorageProvider>,
Arc<dyn crate::domain::ports::VectorStoreProvider>,
) = match entry.storage_kind {
crate::config::StorageKind::Azure => {
let api_version = entry.api_version.clone().unwrap_or_else(|| {
panic!(
"provider '{provider_id}': storage_kind is 'azure' \
but api_version is not set"
)
});
(
Arc::new(
crate::infra::llm::providers::azure_file_storage::AzureFileStorage::new(
Arc::clone(&rag_client),
Arc::clone(&provider_resolver),
api_version.clone(),
),
),
Arc::new(
crate::infra::llm::providers::azure_vector_store::AzureVectorStore::new(
Arc::clone(&rag_client),
Arc::clone(&provider_resolver),
api_version,
),
),
)
}
crate::config::StorageKind::OpenAi => (
Arc::new(
crate::infra::llm::providers::openai_file_storage::OpenAiFileStorage::new(
Arc::clone(&rag_client),
Arc::clone(&provider_resolver),
),
),
Arc::new(
crate::infra::llm::providers::openai_vector_store::OpenAiVectorStore::new(
Arc::clone(&rag_client),
Arc::clone(&provider_resolver),
),
),
),
};
file_impls.insert(provider_id.clone(), file);
vs_impls.insert(provider_id.clone(), vs);
}
let file_storage: Arc<dyn crate::domain::ports::FileStorageProvider> = Arc::new(
crate::infra::llm::providers::dispatching_storage::DispatchingFileStorage::new(
file_impls,
),
);
let vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider> = Arc::new(
crate::infra::llm::providers::dispatching_storage::DispatchingVectorStore::new(
vs_impls,
),
);
let metrics_prefix = cfg.metrics.effective_prefix(Self::MODULE_NAME);
let scope =
opentelemetry::InstrumentationScope::builder(Self::MODULE_NAME.to_owned()).build();
let metrics: Arc<dyn MiniChatMetricsPort> = Arc::new(MiniChatMetricsMeter::new(
&opentelemetry::global::meter_with_scope(scope),
&metrics_prefix,
));
let outbox_enqueuer = Arc::new(InfraOutboxEnqueuer::new(
cfg.outbox.queue_name.clone(),
cfg.outbox.cleanup_queue_name.clone(),
cfg.outbox.chat_cleanup_queue_name.clone(),
cfg.outbox.thread_summary_queue_name.clone(),
cfg.outbox.audit_queue_name.clone(),
cfg.outbox.num_partitions,
));
drop(self.outbox_deferred.set(OutboxDeferred {
db: Arc::clone(&db),
outbox_config: cfg.outbox,
cleanup_config: cfg.cleanup_worker,
model_policy_gw: model_policy_gw.clone(),
audit_gateway: Arc::clone(&audit_gateway),
file_storage: Arc::clone(&file_storage),
vector_store_prov: Arc::clone(&vector_store_prov),
metrics: Arc::clone(&metrics),
enqueuer: Arc::clone(&outbox_enqueuer),
provider_resolver: Arc::clone(&provider_resolver),
model_resolver: model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>,
thread_summary_config: cfg.thread_summary_worker.clone(),
}));
let services = Arc::new(AppServices::new(
&repos,
db,
authz,
&(model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>),
&provider_resolver,
cfg.streaming,
model_policy_gw.clone() as Arc<dyn crate::domain::repos::PolicySnapshotProvider>,
model_policy_gw as Arc<dyn crate::domain::repos::UserLimitsProvider>,
cfg.estimation_budgets,
cfg.quota,
&(outbox_enqueuer as Arc<dyn crate::domain::repos::OutboxEnqueuer>),
cfg.context,
file_storage,
vector_store_prov,
cfg.rag,
cfg.thumbnail,
metrics,
cfg.thread_summary_worker,
));
self.service
.set(services)
.map_err(|_| anyhow::anyhow!("{} module already initialized", Self::MODULE_NAME))?;
self.worker_configs
.set(WorkerConfigs {
orphan_watchdog: cfg.orphan_watchdog,
})
.map_err(|_| anyhow::anyhow!("{} worker_configs already set", Self::MODULE_NAME))?;
info!("{} module initialized successfully", Self::MODULE_NAME);
Ok(())
}
}
impl DatabaseCapability for MiniChatModule {
fn migrations(&self) -> Vec<Box<dyn MigrationTrait>> {
use sea_orm_migration::MigratorTrait;
info!("Providing mini-chat database migrations");
let mut m = crate::infra::db::migrations::Migrator::migrations();
m.extend(modkit_db::outbox::outbox_migrations());
m
}
}
impl RestApiCapability for MiniChatModule {
fn register_rest(
&self,
_ctx: &ModuleCtx,
router: axum::Router,
openapi: &dyn OpenApiRegistry,
) -> anyhow::Result<axum::Router> {
let services = self
.service
.get()
.ok_or_else(|| anyhow::anyhow!("{} not initialized", Self::MODULE_NAME))?;
info!("Registering mini-chat REST routes");
let prefix = self
.url_prefix
.get()
.ok_or_else(|| anyhow::anyhow!("{} not initialized (url_prefix)", Self::MODULE_NAME))?;
let router = routes::register_routes(router, openapi, Arc::clone(services), prefix);
info!("Mini-chat REST routes registered successfully");
Ok(router)
}
}
#[async_trait]
impl RunnableCapability for MiniChatModule {
async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
let wc = self.worker_configs.get().ok_or_else(|| {
anyhow::anyhow!(
"{} worker_configs not set - init() must run before start()",
Self::MODULE_NAME
)
})?;
let leader_elector = background_workers::prepare_worker_runtime(wc).await?;
if let Some(deferred) = self.oagw_deferred.get() {
let ctx =
exchange_client_credentials(&deferred.authn, &deferred.client_credentials).await?;
let mut providers = deferred.providers.clone();
crate::infra::oagw_provisioning::register_oagw_upstreams(
&deferred.gateway,
&ctx,
&mut providers,
)
.await?;
}
if let Some(od) = self.outbox_deferred.get() {
let outbox_db = od.db.db();
let num_partitions = od.outbox_config.num_partitions;
let max_cleanup_attempts = od.cleanup_config.max_attempts;
let partitions = Partitions::of(
u16::try_from(num_partitions)
.map_err(|_| anyhow::anyhow!("num_partitions exceeds u16"))?,
);
let outbox_handle = Outbox::builder(outbox_db)
.queue(&od.outbox_config.queue_name, partitions)
.leased(UsageEventHandler {
plugin_provider: od.model_policy_gw.clone(),
})
.queue(&od.outbox_config.cleanup_queue_name, partitions)
.leased(
crate::infra::workers::cleanup_worker::AttachmentCleanupHandler::new(
Arc::clone(&od.file_storage),
Arc::clone(&od.db),
ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
max_cleanup_attempts,
Arc::clone(&od.metrics),
),
)
.queue(&od.outbox_config.chat_cleanup_queue_name, partitions)
.leased(
crate::infra::workers::cleanup_worker::ChatCleanupHandler::new(
Arc::clone(&od.file_storage),
Arc::clone(&od.vector_store_prov),
Arc::clone(&od.db),
ChatRepository::new(modkit_db::odata::LimitCfg {
default: 20,
max: 100,
}),
max_cleanup_attempts,
Arc::clone(&od.metrics),
),
)
.queue(&od.outbox_config.thread_summary_queue_name, partitions)
.leased(
crate::infra::workers::thread_summary_worker::ThreadSummaryHandler::new(
Arc::new(
crate::infra::workers::thread_summary_worker::ThreadSummaryDeps {
db: Arc::clone(&od.db),
thread_summary_repo: Arc::new(ThreadSummaryRepository),
message_repo: Arc::new(MessageRepository::new(
modkit_db::odata::LimitCfg {
default: 20,
max: 100,
},
)),
outbox_enqueuer: Arc::clone(&od.enqueuer)
as Arc<dyn crate::domain::repos::OutboxEnqueuer>,
metrics: Arc::clone(&od.metrics),
provider_resolver: Arc::clone(&od.provider_resolver),
model_resolver: Arc::clone(&od.model_resolver),
config: od.thread_summary_config.clone(),
},
),
),
)
.queue(&od.outbox_config.audit_queue_name, partitions)
.leased(AuditEventHandler {
audit_gateway: Arc::clone(&od.audit_gateway),
metrics: Arc::clone(&od.metrics),
})
.lease(LeaseConfig {
duration: Duration::from_mins(1),
..LeaseConfig::default()
})
.start()
.await
.map_err(|e| anyhow::anyhow!("outbox start: {e}"))?;
od.enqueuer.set_outbox(Arc::clone(outbox_handle.outbox()));
let mut guard = self
.outbox_handle
.lock()
.map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?;
*guard = Some(outbox_handle);
info!("Outbox pipeline started (OAGW ready)");
}
let orphan_deps = if wc.orphan_watchdog.enabled {
let services = self.service.get().ok_or_else(|| {
anyhow::anyhow!(
"{} not initialized - init() must run before start()",
Self::MODULE_NAME
)
})?;
Some(crate::infra::workers::orphan_watchdog::OrphanWatchdogDeps {
finalization_svc: Arc::clone(&services.finalization),
turn_repo: Arc::clone(&services.turn_repo),
db: Arc::clone(&services.db),
metrics: Arc::clone(&services.metrics),
})
} else {
None
};
let (handles, worker_cancel) =
background_workers::spawn_workers(wc, &cancel, leader_elector.as_ref(), orphan_deps)?;
self.store_worker_runtime(handles, worker_cancel).await?;
Ok(())
}
async fn stop(&self, cancel: CancellationToken) -> anyhow::Result<()> {
if let Some(worker_cancel) = self
.worker_cancel
.lock()
.map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?
.take()
{
worker_cancel.cancel();
}
let workers = self
.worker_handles
.lock()
.map_err(|e| anyhow::anyhow!("worker_handles lock: {e}"))?
.take();
if let Some(handles) = workers {
info!("Waiting for background workers to stop");
handles.join_all(cancel.clone(), WORKER_STOP_TIMEOUT).await;
info!("Background workers stopped");
}
let handle = self
.outbox_handle
.lock()
.map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?
.take();
if let Some(handle) = handle {
info!("Stopping outbox pipeline");
tokio::select! {
() = handle.stop() => {
info!("Outbox pipeline stopped");
}
() = cancel.cancelled() => {
info!("Outbox pipeline stop cancelled by framework deadline");
}
}
}
Ok(())
}
}
impl MiniChatModule {
async fn store_worker_runtime(
&self,
handles: WorkerHandles,
worker_cancel: CancellationToken,
) -> anyhow::Result<()> {
let worker_cancel_cleanup = worker_cancel.clone();
let cancel_already_set = {
let mut guard = self
.worker_cancel
.lock()
.map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?;
if guard.is_some() {
true
} else {
*guard = Some(worker_cancel);
false
}
};
if cancel_already_set {
worker_cancel_cleanup.cancel();
let hard_stop = CancellationToken::new();
hard_stop.cancel();
handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
anyhow::bail!("{} worker_cancel already set", Self::MODULE_NAME);
}
let mut handles = Some(handles);
let handles_err = {
match self.worker_handles.lock() {
Ok(mut guard) => {
if guard.is_some() {
Some("worker_handles already set".to_owned())
} else {
*guard = handles.take();
None
}
}
Err(e) => Some(format!("worker_handles lock: {e}")),
}
};
if let Some(msg) = handles_err {
if let Ok(mut cancel_guard) = self.worker_cancel.lock() {
cancel_guard.take();
}
worker_cancel_cleanup.cancel();
if let Some(handles) = handles {
let hard_stop = CancellationToken::new();
hard_stop.cancel();
handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
}
anyhow::bail!("{} {msg}", Self::MODULE_NAME);
}
Ok(())
}
}
async fn exchange_client_credentials(
authn: &Arc<dyn AuthNResolverClient>,
creds: &crate::config::ClientCredentialsConfig,
) -> anyhow::Result<modkit_security::SecurityContext> {
info!("Exchanging client credentials for OAGW provisioning context");
let request = ClientCredentialsRequest {
client_id: creds.client_id.clone(),
client_secret: creds.client_secret.clone(),
scopes: Vec::new(),
};
let result = authn
.exchange_client_credentials(&request)
.await
.map_err(|e| anyhow::anyhow!("client credentials exchange failed: {e}"))?;
info!("Security context obtained for OAGW provisioning");
Ok(result.security_context)
}
async fn register_plugin_schemas(
registry: &dyn TypesRegistryClient,
schemas: &[(String, &str, &str)],
) -> anyhow::Result<()> {
let mut payload = Vec::with_capacity(schemas.len());
for (schema_str, schema_id, _label) in schemas {
let mut schema_json: serde_json::Value = serde_json::from_str(schema_str)?;
let obj = schema_json
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("schema {schema_id} is not a JSON object"))?;
obj.insert(
"additionalProperties".to_owned(),
serde_json::Value::Bool(false),
);
payload.push(schema_json);
}
let results = registry.register(payload).await?;
RegisterResult::ensure_all_ok(&results)?;
for (_schema_str, schema_id, label) in schemas {
info!(schema_id = %schema_id, "Registered {label} plugin schema in types-registry");
}
Ok(())
}