Skip to main content

mini_chat/
module.rs

1use std::sync::{Arc, Mutex, OnceLock};
2
3use async_trait::async_trait;
4use authn_resolver_sdk::{AuthNResolverClient, ClientCredentialsRequest};
5use authz_resolver_sdk::AuthZResolverClient;
6use mini_chat_sdk::{MiniChatAuditPluginSpecV1, MiniChatModelPolicyPluginSpecV1};
7use modkit::api::OpenApiRegistry;
8use modkit::contracts::RunnableCapability;
9use modkit::{DatabaseCapability, Module, ModuleCtx, RestApiCapability};
10use std::time::Duration;
11
12use modkit_db::outbox::{LeaseConfig, Outbox, OutboxHandle, Partitions};
13use oagw_sdk::ServiceGatewayClientV1;
14use sea_orm_migration::MigrationTrait;
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17use types_registry_sdk::{RegisterResult, TypesRegistryClient};
18
19use crate::api::rest::routes;
20use crate::background_workers::{self, WORKER_STOP_TIMEOUT, WorkerConfigs};
21use crate::config::ProviderEntry;
22use crate::domain::ports::MiniChatMetricsPort;
23use crate::domain::service::{AppServices as GenericAppServices, Repositories};
24use crate::infra::metrics::MiniChatMetricsMeter;
25use crate::infra::outbox::{AuditEventHandler, InfraOutboxEnqueuer, UsageEventHandler};
26use crate::infra::workers::WorkerHandles;
27
28pub(crate) type AppServices = GenericAppServices<
29    TurnRepository,
30    MessageRepository,
31    QuotaUsageRepository,
32    ReactionRepository,
33    ChatRepository,
34    ThreadSummaryRepository,
35    AttachmentRepository,
36    VectorStoreRepository,
37    MessageAttachmentRepository,
38>;
39use crate::infra::audit_gateway::AuditGateway;
40use crate::infra::db::repo::attachment_repo::AttachmentRepository;
41use crate::infra::db::repo::chat_repo::ChatRepository;
42use crate::infra::db::repo::message_attachment_repo::MessageAttachmentRepository;
43use crate::infra::db::repo::message_repo::MessageRepository;
44use crate::infra::db::repo::quota_usage_repo::QuotaUsageRepository;
45use crate::infra::db::repo::reaction_repo::ReactionRepository;
46use crate::infra::db::repo::thread_summary_repo::ThreadSummaryRepository;
47use crate::infra::db::repo::turn_repo::TurnRepository;
48use crate::infra::db::repo::vector_store_repo::VectorStoreRepository;
49use crate::infra::llm::provider_resolver::ProviderResolver;
50use crate::infra::model_policy::ModelPolicyGateway;
51
52/// Default URL prefix for all mini-chat REST routes.
53pub const DEFAULT_URL_PREFIX: &str = "/mini-chat";
54
55/// The mini-chat module: multi-tenant AI chat with SSE streaming.
56#[modkit::module(
57    name = "mini-chat",
58    deps = ["types-registry", "authn-resolver", "authz-resolver", "oagw"],
59    capabilities = [db, rest, stateful],
60)]
61pub struct MiniChatModule {
62    service: OnceLock<Arc<AppServices>>,
63    url_prefix: OnceLock<String>,
64    outbox_handle: Mutex<Option<OutboxHandle>>,
65    /// OAGW gateway + provider config for deferred upstream registration in `start()`.
66    oagw_deferred: OnceLock<OagwDeferred>,
67    /// Worker configs captured in `init()`, consumed by `start()`.
68    worker_configs: OnceLock<WorkerConfigs>,
69    worker_cancel: Mutex<Option<CancellationToken>>,
70    /// Handles to spawned background workers — joined during `stop()`.
71    worker_handles: Mutex<Option<WorkerHandles>>,
72    /// Deferred outbox pipeline params — built in `init()`, started in `start()`.
73    outbox_deferred: OnceLock<OutboxDeferred>,
74}
75
76/// State needed to register OAGW upstreams in `start()` (after GTS is ready).
77struct OagwDeferred {
78    gateway: Arc<dyn ServiceGatewayClientV1>,
79    authn: Arc<dyn AuthNResolverClient>,
80    client_credentials: crate::config::ClientCredentialsConfig,
81    providers: std::collections::HashMap<String, ProviderEntry>,
82}
83
84/// State needed to build + start the outbox pipeline in `start()`.
85/// Captured in `init()`, consumed in `start()` after OAGW registration.
86struct OutboxDeferred {
87    db: Arc<crate::domain::service::DbProvider>,
88    outbox_config: crate::config::OutboxConfig,
89    cleanup_config: crate::config::background::CleanupWorkerConfig,
90    model_policy_gw: Arc<ModelPolicyGateway>,
91    audit_gateway: Arc<AuditGateway>,
92    file_storage: Arc<dyn crate::domain::ports::FileStorageProvider>,
93    vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider>,
94    metrics: Arc<dyn MiniChatMetricsPort>,
95    enqueuer: Arc<InfraOutboxEnqueuer>,
96    provider_resolver: Arc<crate::infra::llm::provider_resolver::ProviderResolver>,
97    model_resolver: Arc<dyn crate::domain::repos::ModelResolver>,
98    thread_summary_config: crate::config::background::ThreadSummaryWorkerConfig,
99}
100
101impl Default for MiniChatModule {
102    fn default() -> Self {
103        Self {
104            service: OnceLock::new(),
105            url_prefix: OnceLock::new(),
106            outbox_handle: Mutex::new(None),
107            oagw_deferred: OnceLock::new(),
108            worker_configs: OnceLock::new(),
109            worker_cancel: Mutex::new(None),
110            worker_handles: Mutex::new(None),
111            outbox_deferred: OnceLock::new(),
112        }
113    }
114}
115
116#[allow(clippy::too_many_lines)]
117#[async_trait]
118impl Module for MiniChatModule {
119    async fn init(&self, ctx: &ModuleCtx) -> anyhow::Result<()> {
120        info!("Initializing {} module", Self::MODULE_NAME);
121
122        let mut cfg: crate::config::MiniChatConfig = ctx.config_expanded_or_default()?;
123        cfg.streaming
124            .validate()
125            .map_err(|e| anyhow::anyhow!("streaming config: {e}"))?;
126        cfg.estimation_budgets
127            .validate()
128            .map_err(|e| anyhow::anyhow!("estimation_budgets config: {e}"))?;
129        cfg.quota
130            .validate()
131            .map_err(|e| anyhow::anyhow!("quota config: {e}"))?;
132        cfg.outbox
133            .validate()
134            .map_err(|e| anyhow::anyhow!("outbox config: {e}"))?;
135        cfg.context
136            .validate()
137            .map_err(|e| anyhow::anyhow!("context config: {e}"))?;
138        cfg.client_credentials
139            .validate()
140            .map_err(|e| anyhow::anyhow!("client_credentials config: {e}"))?;
141        for (id, entry) in &cfg.providers {
142            entry
143                .validate(id)
144                .map_err(|e| anyhow::anyhow!("providers config: {e}"))?;
145        }
146        cfg.orphan_watchdog
147            .validate()
148            .map_err(|e| anyhow::anyhow!("orphan_watchdog config: {e}"))?;
149        cfg.thread_summary_worker
150            .validate()
151            .map_err(|e| anyhow::anyhow!("thread_summary_worker config: {e}"))?;
152        cfg.cleanup_worker
153            .validate()
154            .map_err(|e| anyhow::anyhow!("cleanup_worker config: {e}"))?;
155        cfg.thumbnail
156            .validate()
157            .map_err(|e| anyhow::anyhow!("thumbnail config: {e}"))?;
158        cfg.rag
159            .validate()
160            .map_err(|e| anyhow::anyhow!("rag config: {e}"))?;
161
162        let vendor = cfg.vendor.trim().to_owned();
163        if vendor.is_empty() {
164            return Err(anyhow::anyhow!(
165                "{}: vendor must be a non-empty string",
166                Self::MODULE_NAME
167            ));
168        }
169
170        let registry = ctx.client_hub().get::<dyn TypesRegistryClient>()?;
171        register_plugin_schemas(
172            &*registry,
173            &[
174                (
175                    MiniChatModelPolicyPluginSpecV1::gts_schema_with_refs_as_string(),
176                    MiniChatModelPolicyPluginSpecV1::gts_schema_id(),
177                    "model-policy",
178                ),
179                (
180                    MiniChatAuditPluginSpecV1::gts_schema_with_refs_as_string(),
181                    MiniChatAuditPluginSpecV1::gts_schema_id(),
182                    "audit",
183                ),
184            ],
185        )
186        .await?;
187
188        self.url_prefix
189            .set(cfg.url_prefix)
190            .map_err(|_| anyhow::anyhow!("{} url_prefix already set", Self::MODULE_NAME))?;
191
192        let db_provider = ctx.db_required()?;
193        let db = Arc::new(db_provider);
194
195        // Create the model-policy gateway early for both outbox handler and services.
196        let model_policy_gw = Arc::new(ModelPolicyGateway::new(ctx.client_hub(), vendor.clone()));
197
198        // Audit gateway: lazily resolves audit plugin(s) on first emission.
199        let audit_gateway = Arc::new(AuditGateway::new(ctx.client_hub(), vendor));
200
201        // ── Resolve infrastructure deps needed by both outbox handlers and services ──
202
203        let authz = ctx
204            .client_hub()
205            .get::<dyn AuthZResolverClient>()
206            .map_err(|e| anyhow::anyhow!("failed to get AuthZ resolver: {e}"))?;
207
208        let authn_client = ctx
209            .client_hub()
210            .get::<dyn AuthNResolverClient>()
211            .map_err(|e| anyhow::anyhow!("failed to get AuthN resolver: {e}"))?;
212
213        let gateway = ctx
214            .client_hub()
215            .get::<dyn ServiceGatewayClientV1>()
216            .map_err(|e| anyhow::anyhow!("failed to get OAGW gateway: {e}"))?;
217
218        // Pre-fill upstream_alias with host as fallback so ProviderResolver
219        // works immediately. The actual OAGW registration is deferred to
220        // start() because GTS instances are not visible via list() until
221        // post_init (types-registry switches to ready mode there).
222        for entry in cfg.providers.values_mut() {
223            if entry.upstream_alias.is_none() {
224                entry.upstream_alias = Some(entry.host.clone());
225            }
226            for ovr in entry.tenant_overrides.values_mut() {
227                if ovr.upstream_alias.is_none()
228                    && let Some(ref h) = ovr.host
229                {
230                    ovr.upstream_alias = Some(h.clone());
231                }
232            }
233        }
234
235        // Save a copy for deferred OAGW registration in start().
236        // Ignore the result: if already set, we keep the first value.
237        drop(self.oagw_deferred.set(OagwDeferred {
238            gateway: Arc::clone(&gateway),
239            authn: Arc::clone(&authn_client),
240            client_credentials: cfg.client_credentials.clone(),
241            providers: cfg.providers.clone(),
242        }));
243
244        let provider_resolver = Arc::new(ProviderResolver::new(&gateway, cfg.providers));
245
246        let repos = Repositories {
247            chat: Arc::new(ChatRepository::new(modkit_db::odata::LimitCfg {
248                default: 20,
249                max: 100,
250            })),
251            attachment: Arc::new(AttachmentRepository),
252            message: Arc::new(MessageRepository::new(modkit_db::odata::LimitCfg {
253                default: 20,
254                max: 100,
255            })),
256            quota: Arc::new(QuotaUsageRepository),
257            turn: Arc::new(TurnRepository),
258            reaction: Arc::new(ReactionRepository),
259            thread_summary: Arc::new(ThreadSummaryRepository),
260            vector_store: Arc::new(VectorStoreRepository),
261            message_attachment: Arc::new(MessageAttachmentRepository),
262        };
263
264        let rag_client = Arc::new(
265            crate::infra::llm::providers::rag_http_client::RagHttpClient::new(Arc::clone(&gateway)),
266        );
267
268        // Build provider-specific file/vector store impls per provider entry.
269        // Dispatch by storage_kind: Azure → Azure impls, OpenAi → OpenAI impls.
270        let mut file_impls: std::collections::HashMap<
271            String,
272            Arc<dyn crate::domain::ports::FileStorageProvider>,
273        > = std::collections::HashMap::new();
274        let mut vs_impls: std::collections::HashMap<
275            String,
276            Arc<dyn crate::domain::ports::VectorStoreProvider>,
277        > = std::collections::HashMap::new();
278        for (provider_id, entry) in provider_resolver.entries() {
279            let (file, vs): (
280                Arc<dyn crate::domain::ports::FileStorageProvider>,
281                Arc<dyn crate::domain::ports::VectorStoreProvider>,
282            ) = match entry.storage_kind {
283                crate::config::StorageKind::Azure => {
284                    let api_version = entry.api_version.clone().unwrap_or_else(|| {
285                        panic!(
286                            "provider '{provider_id}': storage_kind is 'azure' \
287                             but api_version is not set"
288                        )
289                    });
290                    (
291                        Arc::new(
292                            crate::infra::llm::providers::azure_file_storage::AzureFileStorage::new(
293                                Arc::clone(&rag_client),
294                                Arc::clone(&provider_resolver),
295                                api_version.clone(),
296                            ),
297                        ),
298                        Arc::new(
299                            crate::infra::llm::providers::azure_vector_store::AzureVectorStore::new(
300                                Arc::clone(&rag_client),
301                                Arc::clone(&provider_resolver),
302                                api_version,
303                            ),
304                        ),
305                    )
306                }
307                crate::config::StorageKind::OpenAi => (
308                    Arc::new(
309                        crate::infra::llm::providers::openai_file_storage::OpenAiFileStorage::new(
310                            Arc::clone(&rag_client),
311                            Arc::clone(&provider_resolver),
312                        ),
313                    ),
314                    Arc::new(
315                        crate::infra::llm::providers::openai_vector_store::OpenAiVectorStore::new(
316                            Arc::clone(&rag_client),
317                            Arc::clone(&provider_resolver),
318                        ),
319                    ),
320                ),
321            };
322            file_impls.insert(provider_id.clone(), file);
323            vs_impls.insert(provider_id.clone(), vs);
324        }
325        let file_storage: Arc<dyn crate::domain::ports::FileStorageProvider> = Arc::new(
326            crate::infra::llm::providers::dispatching_storage::DispatchingFileStorage::new(
327                file_impls,
328            ),
329        );
330        let vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider> = Arc::new(
331            crate::infra::llm::providers::dispatching_storage::DispatchingVectorStore::new(
332                vs_impls,
333            ),
334        );
335
336        // ── Metrics ─────────────────────────────────────────────────────────
337
338        let metrics_prefix = cfg.metrics.effective_prefix(Self::MODULE_NAME);
339        let scope =
340            opentelemetry::InstrumentationScope::builder(Self::MODULE_NAME.to_owned()).build();
341        let metrics: Arc<dyn MiniChatMetricsPort> = Arc::new(MiniChatMetricsMeter::new(
342            &opentelemetry::global::meter_with_scope(scope),
343            &metrics_prefix,
344        ));
345
346        // ── Outbox enqueuer (lazy) ────────────────────────────────────────
347        //
348        // The enqueuer is created now (services need it), but the actual outbox
349        // pipeline starts in start() -- after OAGW upstreams are registered.
350        // HTTP traffic doesn't arrive until after start(), so enqueue() is never
351        // called before the outbox handle is set.
352
353        let outbox_enqueuer = Arc::new(InfraOutboxEnqueuer::new(
354            cfg.outbox.queue_name.clone(),
355            cfg.outbox.cleanup_queue_name.clone(),
356            cfg.outbox.chat_cleanup_queue_name.clone(),
357            cfg.outbox.thread_summary_queue_name.clone(),
358            cfg.outbox.audit_queue_name.clone(),
359            cfg.outbox.num_partitions,
360        ));
361
362        // Save params for start() to build + start the outbox pipeline.
363        drop(self.outbox_deferred.set(OutboxDeferred {
364            db: Arc::clone(&db),
365            outbox_config: cfg.outbox,
366            cleanup_config: cfg.cleanup_worker,
367            model_policy_gw: model_policy_gw.clone(),
368            audit_gateway: Arc::clone(&audit_gateway),
369            file_storage: Arc::clone(&file_storage),
370            vector_store_prov: Arc::clone(&vector_store_prov),
371            metrics: Arc::clone(&metrics),
372            enqueuer: Arc::clone(&outbox_enqueuer),
373            provider_resolver: Arc::clone(&provider_resolver),
374            model_resolver: model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>,
375            thread_summary_config: cfg.thread_summary_worker.clone(),
376        }));
377
378        // ── Services ────────────────────────────────────────────────────────
379
380        let services = Arc::new(AppServices::new(
381            &repos,
382            db,
383            authz,
384            &(model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>),
385            &provider_resolver,
386            cfg.streaming,
387            model_policy_gw.clone() as Arc<dyn crate::domain::repos::PolicySnapshotProvider>,
388            model_policy_gw as Arc<dyn crate::domain::repos::UserLimitsProvider>,
389            cfg.estimation_budgets,
390            cfg.quota,
391            &(outbox_enqueuer as Arc<dyn crate::domain::repos::OutboxEnqueuer>),
392            cfg.context,
393            file_storage,
394            vector_store_prov,
395            cfg.rag,
396            cfg.thumbnail,
397            metrics,
398            cfg.thread_summary_worker,
399        ));
400
401        self.service
402            .set(services)
403            .map_err(|_| anyhow::anyhow!("{} module already initialized", Self::MODULE_NAME))?;
404
405        self.worker_configs
406            .set(WorkerConfigs {
407                orphan_watchdog: cfg.orphan_watchdog,
408            })
409            .map_err(|_| anyhow::anyhow!("{} worker_configs already set", Self::MODULE_NAME))?;
410
411        info!("{} module initialized successfully", Self::MODULE_NAME);
412        Ok(())
413    }
414}
415
416impl DatabaseCapability for MiniChatModule {
417    fn migrations(&self) -> Vec<Box<dyn MigrationTrait>> {
418        use sea_orm_migration::MigratorTrait;
419        info!("Providing mini-chat database migrations");
420        let mut m = crate::infra::db::migrations::Migrator::migrations();
421        m.extend(modkit_db::outbox::outbox_migrations());
422        m
423    }
424}
425
426impl RestApiCapability for MiniChatModule {
427    fn register_rest(
428        &self,
429        _ctx: &ModuleCtx,
430        router: axum::Router,
431        openapi: &dyn OpenApiRegistry,
432    ) -> anyhow::Result<axum::Router> {
433        let services = self
434            .service
435            .get()
436            .ok_or_else(|| anyhow::anyhow!("{} not initialized", Self::MODULE_NAME))?;
437
438        info!("Registering mini-chat REST routes");
439        let prefix = self
440            .url_prefix
441            .get()
442            .ok_or_else(|| anyhow::anyhow!("{} not initialized (url_prefix)", Self::MODULE_NAME))?;
443
444        let router = routes::register_routes(router, openapi, Arc::clone(services), prefix);
445        info!("Mini-chat REST routes registered successfully");
446        Ok(router)
447    }
448}
449
450#[async_trait]
451impl RunnableCapability for MiniChatModule {
452    async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
453        let wc = self.worker_configs.get().ok_or_else(|| {
454            anyhow::anyhow!(
455                "{} worker_configs not set - init() must run before start()",
456                Self::MODULE_NAME
457            )
458        })?;
459        let leader_elector = background_workers::prepare_worker_runtime(wc).await?;
460
461        // Register OAGW upstreams now that GTS is in ready mode (post_init
462        // has completed). During init() this fails because types-registry
463        // list() only queries the persistent store which is empty until
464        // switch_to_ready().
465        if let Some(deferred) = self.oagw_deferred.get() {
466            let ctx =
467                exchange_client_credentials(&deferred.authn, &deferred.client_credentials).await?;
468            let mut providers = deferred.providers.clone();
469            crate::infra::oagw_provisioning::register_oagw_upstreams(
470                &deferred.gateway,
471                &ctx,
472                &mut providers,
473            )
474            .await?;
475        }
476
477        // Start the outbox pipeline now that OAGW upstreams are registered.
478        // Cleanup handlers can immediately call provider DELETE via OAGW.
479        if let Some(od) = self.outbox_deferred.get() {
480            let outbox_db = od.db.db();
481            let num_partitions = od.outbox_config.num_partitions;
482            let max_cleanup_attempts = od.cleanup_config.max_attempts;
483
484            let partitions = Partitions::of(
485                u16::try_from(num_partitions)
486                    .map_err(|_| anyhow::anyhow!("num_partitions exceeds u16"))?,
487            );
488
489            let outbox_handle = Outbox::builder(outbox_db)
490                .queue(&od.outbox_config.queue_name, partitions)
491                .leased(UsageEventHandler {
492                    plugin_provider: od.model_policy_gw.clone(),
493                })
494                .queue(&od.outbox_config.cleanup_queue_name, partitions)
495                .leased(
496                    crate::infra::workers::cleanup_worker::AttachmentCleanupHandler::new(
497                        Arc::clone(&od.file_storage),
498                        Arc::clone(&od.db),
499                        ChatRepository::new(modkit_db::odata::LimitCfg {
500                            default: 20,
501                            max: 100,
502                        }),
503                        max_cleanup_attempts,
504                        Arc::clone(&od.metrics),
505                    ),
506                )
507                .queue(&od.outbox_config.chat_cleanup_queue_name, partitions)
508                .leased(
509                    crate::infra::workers::cleanup_worker::ChatCleanupHandler::new(
510                        Arc::clone(&od.file_storage),
511                        Arc::clone(&od.vector_store_prov),
512                        Arc::clone(&od.db),
513                        ChatRepository::new(modkit_db::odata::LimitCfg {
514                            default: 20,
515                            max: 100,
516                        }),
517                        max_cleanup_attempts,
518                        Arc::clone(&od.metrics),
519                    ),
520                )
521                .queue(&od.outbox_config.thread_summary_queue_name, partitions)
522                .leased(
523                    crate::infra::workers::thread_summary_worker::ThreadSummaryHandler::new(
524                        Arc::new(
525                            crate::infra::workers::thread_summary_worker::ThreadSummaryDeps {
526                                db: Arc::clone(&od.db),
527                                thread_summary_repo: Arc::new(ThreadSummaryRepository),
528                                message_repo: Arc::new(MessageRepository::new(
529                                    modkit_db::odata::LimitCfg {
530                                        default: 20,
531                                        max: 100,
532                                    },
533                                )),
534                                outbox_enqueuer: Arc::clone(&od.enqueuer)
535                                    as Arc<dyn crate::domain::repos::OutboxEnqueuer>,
536                                metrics: Arc::clone(&od.metrics),
537                                provider_resolver: Arc::clone(&od.provider_resolver),
538                                model_resolver: Arc::clone(&od.model_resolver),
539                                config: od.thread_summary_config.clone(),
540                            },
541                        ),
542                    ),
543                )
544                .queue(&od.outbox_config.audit_queue_name, partitions)
545                .leased(AuditEventHandler {
546                    audit_gateway: Arc::clone(&od.audit_gateway),
547                    metrics: Arc::clone(&od.metrics),
548                })
549                .lease(LeaseConfig {
550                    duration: Duration::from_mins(1),
551                    ..LeaseConfig::default()
552                })
553                .start()
554                .await
555                .map_err(|e| anyhow::anyhow!("outbox start: {e}"))?;
556
557            // Wire the outbox handle into the lazy enqueuer.
558            od.enqueuer.set_outbox(Arc::clone(outbox_handle.outbox()));
559
560            let mut guard = self
561                .outbox_handle
562                .lock()
563                .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?;
564            *guard = Some(outbox_handle);
565
566            info!("Outbox pipeline started (OAGW ready)");
567        }
568
569        let orphan_deps = if wc.orphan_watchdog.enabled {
570            let services = self.service.get().ok_or_else(|| {
571                anyhow::anyhow!(
572                    "{} not initialized - init() must run before start()",
573                    Self::MODULE_NAME
574                )
575            })?;
576            Some(crate::infra::workers::orphan_watchdog::OrphanWatchdogDeps {
577                finalization_svc: Arc::clone(&services.finalization),
578                turn_repo: Arc::clone(&services.turn_repo),
579                db: Arc::clone(&services.db),
580                metrics: Arc::clone(&services.metrics),
581            })
582        } else {
583            None
584        };
585
586        let (handles, worker_cancel) =
587            background_workers::spawn_workers(wc, &cancel, leader_elector.as_ref(), orphan_deps)?;
588        self.store_worker_runtime(handles, worker_cancel).await?;
589
590        Ok(())
591    }
592
593    async fn stop(&self, cancel: CancellationToken) -> anyhow::Result<()> {
594        if let Some(worker_cancel) = self
595            .worker_cancel
596            .lock()
597            .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?
598            .take()
599        {
600            worker_cancel.cancel();
601        }
602
603        let workers = self
604            .worker_handles
605            .lock()
606            .map_err(|e| anyhow::anyhow!("worker_handles lock: {e}"))?
607            .take();
608        if let Some(handles) = workers {
609            info!("Waiting for background workers to stop");
610            handles.join_all(cancel.clone(), WORKER_STOP_TIMEOUT).await;
611            info!("Background workers stopped");
612        }
613
614        let handle = self
615            .outbox_handle
616            .lock()
617            .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?
618            .take();
619        if let Some(handle) = handle {
620            info!("Stopping outbox pipeline");
621            tokio::select! {
622                () = handle.stop() => {
623                    info!("Outbox pipeline stopped");
624                }
625                () = cancel.cancelled() => {
626                    info!("Outbox pipeline stop cancelled by framework deadline");
627                }
628            }
629        }
630        Ok(())
631    }
632}
633
634impl MiniChatModule {
635    async fn store_worker_runtime(
636        &self,
637        handles: WorkerHandles,
638        worker_cancel: CancellationToken,
639    ) -> anyhow::Result<()> {
640        let worker_cancel_cleanup = worker_cancel.clone();
641
642        // Store cancel token. Guard must not live across an await point.
643        let cancel_already_set = {
644            let mut guard = self
645                .worker_cancel
646                .lock()
647                .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?;
648            if guard.is_some() {
649                true
650            } else {
651                *guard = Some(worker_cancel);
652                false
653            }
654            // guard dropped here — before any await
655        };
656        if cancel_already_set {
657            worker_cancel_cleanup.cancel();
658            let hard_stop = CancellationToken::new();
659            hard_stop.cancel();
660            handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
661            anyhow::bail!("{} worker_cancel already set", Self::MODULE_NAME);
662        }
663
664        // Store handles. Guard must not live across an await point.
665        let mut handles = Some(handles);
666        let handles_err = {
667            match self.worker_handles.lock() {
668                Ok(mut guard) => {
669                    if guard.is_some() {
670                        Some("worker_handles already set".to_owned())
671                    } else {
672                        *guard = handles.take();
673                        None
674                    }
675                }
676                Err(e) => Some(format!("worker_handles lock: {e}")),
677            }
678            // guard dropped here — before any await
679        };
680        if let Some(msg) = handles_err {
681            if let Ok(mut cancel_guard) = self.worker_cancel.lock() {
682                cancel_guard.take();
683            }
684            worker_cancel_cleanup.cancel();
685            if let Some(handles) = handles {
686                let hard_stop = CancellationToken::new();
687                hard_stop.cancel();
688                handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
689            }
690            // handles was either moved into the mutex (not the error case)
691            // or never stored. In the "already set" case it was moved, so
692            // we rely on the cancel token to stop workers; their JoinHandles
693            // will be cleaned up when the existing WorkerHandles is joined
694            // in stop().
695            anyhow::bail!("{} {msg}", Self::MODULE_NAME);
696        }
697        Ok(())
698    }
699}
700
701/// Exchange `OAuth2` client credentials via the `AuthN` resolver to obtain
702/// a `SecurityContext` for OAGW upstream provisioning.
703async fn exchange_client_credentials(
704    authn: &Arc<dyn AuthNResolverClient>,
705    creds: &crate::config::ClientCredentialsConfig,
706) -> anyhow::Result<modkit_security::SecurityContext> {
707    info!("Exchanging client credentials for OAGW provisioning context");
708    let request = ClientCredentialsRequest {
709        client_id: creds.client_id.clone(),
710        client_secret: creds.client_secret.clone(),
711        scopes: Vec::new(),
712    };
713    let result = authn
714        .exchange_client_credentials(&request)
715        .await
716        .map_err(|e| anyhow::anyhow!("client credentials exchange failed: {e}"))?;
717    info!("Security context obtained for OAGW provisioning");
718    Ok(result.security_context)
719}
720
721async fn register_plugin_schemas(
722    registry: &dyn TypesRegistryClient,
723    schemas: &[(String, &str, &str)],
724) -> anyhow::Result<()> {
725    let mut payload = Vec::with_capacity(schemas.len());
726    for (schema_str, schema_id, _label) in schemas {
727        let mut schema_json: serde_json::Value = serde_json::from_str(schema_str)?;
728        let obj = schema_json
729            .as_object_mut()
730            .ok_or_else(|| anyhow::anyhow!("schema {schema_id} is not a JSON object"))?;
731        obj.insert(
732            "additionalProperties".to_owned(),
733            serde_json::Value::Bool(false),
734        );
735        payload.push(schema_json);
736    }
737    let results = registry.register(payload).await?;
738    RegisterResult::ensure_all_ok(&results)?;
739    for (_schema_str, schema_id, label) in schemas {
740        info!(schema_id = %schema_id, "Registered {label} plugin schema in types-registry");
741    }
742    Ok(())
743}