Skip to main content

mini_chat/
module.rs

1use std::sync::{Arc, Mutex, OnceLock};
2
3use async_trait::async_trait;
4use authn_resolver_sdk::{AuthNResolverClient, ClientCredentialsRequest};
5use authz_resolver_sdk::AuthZResolverClient;
6use mini_chat_sdk::{MiniChatAuditPluginSpecV1, MiniChatModelPolicyPluginSpecV1};
7use modkit::api::OpenApiRegistry;
8use modkit::contracts::RunnableCapability;
9use modkit::{DatabaseCapability, Module, ModuleCtx, RestApiCapability};
10use std::time::Duration;
11
12use modkit_db::outbox::{LeaseConfig, Outbox, OutboxHandle, Partitions};
13use oagw_sdk::ServiceGatewayClientV1;
14use sea_orm_migration::MigrationTrait;
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17use types_registry_sdk::{RegisterResult, TypesRegistryClient};
18
19use crate::api::rest::routes;
20use crate::background_workers::{self, WORKER_STOP_TIMEOUT, WorkerConfigs};
21use crate::config::ProviderEntry;
22use crate::domain::ports::MiniChatMetricsPort;
23use crate::domain::service::{AppServices as GenericAppServices, Repositories};
24use crate::infra::metrics::MiniChatMetricsMeter;
25use crate::infra::outbox::{AuditEventHandler, InfraOutboxEnqueuer, UsageEventHandler};
26use crate::infra::workers::WorkerHandles;
27
28pub(crate) type AppServices = GenericAppServices<
29    TurnRepository,
30    MessageRepository,
31    QuotaUsageRepository,
32    ReactionRepository,
33    ChatRepository,
34    ThreadSummaryRepository,
35    AttachmentRepository,
36    VectorStoreRepository,
37    MessageAttachmentRepository,
38>;
39use crate::infra::audit_gateway::AuditGateway;
40use crate::infra::db::repo::attachment_repo::AttachmentRepository;
41use crate::infra::db::repo::chat_repo::ChatRepository;
42use crate::infra::db::repo::message_attachment_repo::MessageAttachmentRepository;
43use crate::infra::db::repo::message_repo::MessageRepository;
44use crate::infra::db::repo::quota_usage_repo::QuotaUsageRepository;
45use crate::infra::db::repo::reaction_repo::ReactionRepository;
46use crate::infra::db::repo::thread_summary_repo::ThreadSummaryRepository;
47use crate::infra::db::repo::turn_repo::TurnRepository;
48use crate::infra::db::repo::vector_store_repo::VectorStoreRepository;
49use crate::infra::llm::provider_resolver::ProviderResolver;
50use crate::infra::model_policy::ModelPolicyGateway;
51
52/// Default URL prefix for all mini-chat REST routes.
53pub const DEFAULT_URL_PREFIX: &str = "/mini-chat";
54
55/// The mini-chat module: multi-tenant AI chat with SSE streaming.
56#[modkit::module(
57    name = "mini-chat",
58    deps = ["types-registry", "authn-resolver", "authz-resolver", "oagw"],
59    capabilities = [db, rest, stateful],
60)]
61pub struct MiniChatModule {
62    service: OnceLock<Arc<AppServices>>,
63    url_prefix: OnceLock<String>,
64    outbox_handle: Mutex<Option<OutboxHandle>>,
65    /// OAGW gateway + provider config for deferred upstream registration in `start()`.
66    oagw_deferred: OnceLock<OagwDeferred>,
67    /// Worker configs captured in `init()`, consumed by `start()`.
68    worker_configs: OnceLock<WorkerConfigs>,
69    worker_cancel: Mutex<Option<CancellationToken>>,
70    /// Handles to spawned background workers — joined during `stop()`.
71    worker_handles: Mutex<Option<WorkerHandles>>,
72    /// Deferred outbox pipeline params — built in `init()`, started in `start()`.
73    outbox_deferred: OnceLock<OutboxDeferred>,
74}
75
76/// State needed to register OAGW upstreams in `start()` (after GTS is ready).
77struct OagwDeferred {
78    gateway: Arc<dyn ServiceGatewayClientV1>,
79    authn: Arc<dyn AuthNResolverClient>,
80    client_credentials: crate::config::ClientCredentialsConfig,
81    providers: std::collections::HashMap<String, ProviderEntry>,
82}
83
84/// State needed to build + start the outbox pipeline in `start()`.
85/// Captured in `init()`, consumed in `start()` after OAGW registration.
86struct OutboxDeferred {
87    db: Arc<crate::domain::service::DbProvider>,
88    outbox_config: crate::config::OutboxConfig,
89    cleanup_config: crate::config::background::CleanupWorkerConfig,
90    model_policy_gw: Arc<ModelPolicyGateway>,
91    audit_gateway: Arc<AuditGateway>,
92    file_storage: Arc<dyn crate::domain::ports::FileStorageProvider>,
93    vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider>,
94    metrics: Arc<dyn MiniChatMetricsPort>,
95    enqueuer: Arc<InfraOutboxEnqueuer>,
96}
97
98impl Default for MiniChatModule {
99    fn default() -> Self {
100        Self {
101            service: OnceLock::new(),
102            url_prefix: OnceLock::new(),
103            outbox_handle: Mutex::new(None),
104            oagw_deferred: OnceLock::new(),
105            worker_configs: OnceLock::new(),
106            worker_cancel: Mutex::new(None),
107            worker_handles: Mutex::new(None),
108            outbox_deferred: OnceLock::new(),
109        }
110    }
111}
112
113#[allow(clippy::too_many_lines)]
114#[async_trait]
115impl Module for MiniChatModule {
116    async fn init(&self, ctx: &ModuleCtx) -> anyhow::Result<()> {
117        info!("Initializing {} module", Self::MODULE_NAME);
118
119        let mut cfg: crate::config::MiniChatConfig = ctx.config_expanded()?;
120        cfg.streaming
121            .validate()
122            .map_err(|e| anyhow::anyhow!("streaming config: {e}"))?;
123        cfg.estimation_budgets
124            .validate()
125            .map_err(|e| anyhow::anyhow!("estimation_budgets config: {e}"))?;
126        cfg.quota
127            .validate()
128            .map_err(|e| anyhow::anyhow!("quota config: {e}"))?;
129        cfg.outbox
130            .validate()
131            .map_err(|e| anyhow::anyhow!("outbox config: {e}"))?;
132        cfg.context
133            .validate()
134            .map_err(|e| anyhow::anyhow!("context config: {e}"))?;
135        cfg.client_credentials
136            .validate()
137            .map_err(|e| anyhow::anyhow!("client_credentials config: {e}"))?;
138        for (id, entry) in &cfg.providers {
139            entry
140                .validate(id)
141                .map_err(|e| anyhow::anyhow!("providers config: {e}"))?;
142        }
143        cfg.orphan_watchdog
144            .validate()
145            .map_err(|e| anyhow::anyhow!("orphan_watchdog config: {e}"))?;
146        cfg.thread_summary_worker
147            .validate()
148            .map_err(|e| anyhow::anyhow!("thread_summary_worker config: {e}"))?;
149        cfg.cleanup_worker
150            .validate()
151            .map_err(|e| anyhow::anyhow!("cleanup_worker config: {e}"))?;
152        cfg.thumbnail
153            .validate()
154            .map_err(|e| anyhow::anyhow!("thumbnail config: {e}"))?;
155
156        let vendor = cfg.vendor.trim().to_owned();
157        if vendor.is_empty() {
158            return Err(anyhow::anyhow!(
159                "{}: vendor must be a non-empty string",
160                Self::MODULE_NAME
161            ));
162        }
163
164        let registry = ctx.client_hub().get::<dyn TypesRegistryClient>()?;
165        register_plugin_schemas(
166            &*registry,
167            &[
168                (
169                    MiniChatModelPolicyPluginSpecV1::gts_schema_with_refs_as_string(),
170                    MiniChatModelPolicyPluginSpecV1::gts_schema_id(),
171                    "model-policy",
172                ),
173                (
174                    MiniChatAuditPluginSpecV1::gts_schema_with_refs_as_string(),
175                    MiniChatAuditPluginSpecV1::gts_schema_id(),
176                    "audit",
177                ),
178            ],
179        )
180        .await?;
181
182        self.url_prefix
183            .set(cfg.url_prefix)
184            .map_err(|_| anyhow::anyhow!("{} url_prefix already set", Self::MODULE_NAME))?;
185
186        let db_provider = ctx.db_required()?;
187        let db = Arc::new(db_provider);
188
189        // Create the model-policy gateway early for both outbox handler and services.
190        let model_policy_gw = Arc::new(ModelPolicyGateway::new(ctx.client_hub(), vendor.clone()));
191
192        // Audit gateway: lazily resolves audit plugin(s) on first emission.
193        let audit_gateway = Arc::new(AuditGateway::new(ctx.client_hub(), vendor));
194
195        // ── Resolve infrastructure deps needed by both outbox handlers and services ──
196
197        let authz = ctx
198            .client_hub()
199            .get::<dyn AuthZResolverClient>()
200            .map_err(|e| anyhow::anyhow!("failed to get AuthZ resolver: {e}"))?;
201
202        let authn_client = ctx
203            .client_hub()
204            .get::<dyn AuthNResolverClient>()
205            .map_err(|e| anyhow::anyhow!("failed to get AuthN resolver: {e}"))?;
206
207        let gateway = ctx
208            .client_hub()
209            .get::<dyn ServiceGatewayClientV1>()
210            .map_err(|e| anyhow::anyhow!("failed to get OAGW gateway: {e}"))?;
211
212        // Pre-fill upstream_alias with host as fallback so ProviderResolver
213        // works immediately. The actual OAGW registration is deferred to
214        // start() because GTS instances are not visible via list() until
215        // post_init (types-registry switches to ready mode there).
216        for entry in cfg.providers.values_mut() {
217            if entry.upstream_alias.is_none() {
218                entry.upstream_alias = Some(entry.host.clone());
219            }
220            for ovr in entry.tenant_overrides.values_mut() {
221                if ovr.upstream_alias.is_none()
222                    && let Some(ref h) = ovr.host
223                {
224                    ovr.upstream_alias = Some(h.clone());
225                }
226            }
227        }
228
229        // Save a copy for deferred OAGW registration in start().
230        // Ignore the result: if already set, we keep the first value.
231        drop(self.oagw_deferred.set(OagwDeferred {
232            gateway: Arc::clone(&gateway),
233            authn: Arc::clone(&authn_client),
234            client_credentials: cfg.client_credentials.clone(),
235            providers: cfg.providers.clone(),
236        }));
237
238        let provider_resolver = Arc::new(ProviderResolver::new(&gateway, cfg.providers));
239
240        let repos = Repositories {
241            chat: Arc::new(ChatRepository::new(modkit_db::odata::LimitCfg {
242                default: 20,
243                max: 100,
244            })),
245            attachment: Arc::new(AttachmentRepository),
246            message: Arc::new(MessageRepository::new(modkit_db::odata::LimitCfg {
247                default: 20,
248                max: 100,
249            })),
250            quota: Arc::new(QuotaUsageRepository),
251            turn: Arc::new(TurnRepository),
252            reaction: Arc::new(ReactionRepository),
253            thread_summary: Arc::new(ThreadSummaryRepository),
254            vector_store: Arc::new(VectorStoreRepository),
255            message_attachment: Arc::new(MessageAttachmentRepository),
256        };
257
258        let rag_client = Arc::new(
259            crate::infra::llm::providers::rag_http_client::RagHttpClient::new(Arc::clone(&gateway)),
260        );
261
262        // Build provider-specific file/vector store impls per provider entry.
263        // Dispatch by storage_kind: Azure → Azure impls, OpenAi → OpenAI impls.
264        let mut file_impls: std::collections::HashMap<
265            String,
266            Arc<dyn crate::domain::ports::FileStorageProvider>,
267        > = std::collections::HashMap::new();
268        let mut vs_impls: std::collections::HashMap<
269            String,
270            Arc<dyn crate::domain::ports::VectorStoreProvider>,
271        > = std::collections::HashMap::new();
272        for (provider_id, entry) in provider_resolver.entries() {
273            let (file, vs): (
274                Arc<dyn crate::domain::ports::FileStorageProvider>,
275                Arc<dyn crate::domain::ports::VectorStoreProvider>,
276            ) = match entry.storage_kind {
277                crate::config::StorageKind::Azure => {
278                    let api_version = entry.api_version.clone().unwrap_or_else(|| {
279                        panic!(
280                            "provider '{provider_id}': storage_kind is 'azure' \
281                             but api_version is not set"
282                        )
283                    });
284                    (
285                        Arc::new(
286                            crate::infra::llm::providers::azure_file_storage::AzureFileStorage::new(
287                                Arc::clone(&rag_client),
288                                Arc::clone(&provider_resolver),
289                                api_version.clone(),
290                            ),
291                        ),
292                        Arc::new(
293                            crate::infra::llm::providers::azure_vector_store::AzureVectorStore::new(
294                                Arc::clone(&rag_client),
295                                Arc::clone(&provider_resolver),
296                                api_version,
297                            ),
298                        ),
299                    )
300                }
301                crate::config::StorageKind::OpenAi => (
302                    Arc::new(
303                        crate::infra::llm::providers::openai_file_storage::OpenAiFileStorage::new(
304                            Arc::clone(&rag_client),
305                            Arc::clone(&provider_resolver),
306                        ),
307                    ),
308                    Arc::new(
309                        crate::infra::llm::providers::openai_vector_store::OpenAiVectorStore::new(
310                            Arc::clone(&rag_client),
311                            Arc::clone(&provider_resolver),
312                        ),
313                    ),
314                ),
315            };
316            file_impls.insert(provider_id.clone(), file);
317            vs_impls.insert(provider_id.clone(), vs);
318        }
319        let file_storage: Arc<dyn crate::domain::ports::FileStorageProvider> = Arc::new(
320            crate::infra::llm::providers::dispatching_storage::DispatchingFileStorage::new(
321                file_impls,
322            ),
323        );
324        let vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider> = Arc::new(
325            crate::infra::llm::providers::dispatching_storage::DispatchingVectorStore::new(
326                vs_impls,
327            ),
328        );
329
330        // ── Metrics ─────────────────────────────────────────────────────────
331
332        let metrics_prefix = cfg.metrics.effective_prefix(Self::MODULE_NAME);
333        let scope =
334            opentelemetry::InstrumentationScope::builder(Self::MODULE_NAME.to_owned()).build();
335        let metrics: Arc<dyn MiniChatMetricsPort> = Arc::new(MiniChatMetricsMeter::new(
336            &opentelemetry::global::meter_with_scope(scope),
337            &metrics_prefix,
338        ));
339
340        // ── Outbox enqueuer (lazy) ────────────────────────────────────────
341        //
342        // The enqueuer is created now (services need it), but the actual outbox
343        // pipeline starts in start() -- after OAGW upstreams are registered.
344        // HTTP traffic doesn't arrive until after start(), so enqueue() is never
345        // called before the outbox handle is set.
346
347        let outbox_enqueuer = Arc::new(InfraOutboxEnqueuer::new(
348            cfg.outbox.queue_name.clone(),
349            cfg.outbox.cleanup_queue_name.clone(),
350            cfg.outbox.chat_cleanup_queue_name.clone(),
351            cfg.outbox.thread_summary_queue_name.clone(),
352            cfg.outbox.audit_queue_name.clone(),
353            cfg.outbox.num_partitions,
354        ));
355
356        // Save params for start() to build + start the outbox pipeline.
357        drop(self.outbox_deferred.set(OutboxDeferred {
358            db: Arc::clone(&db),
359            outbox_config: cfg.outbox,
360            cleanup_config: cfg.cleanup_worker,
361            model_policy_gw: model_policy_gw.clone(),
362            audit_gateway: Arc::clone(&audit_gateway),
363            file_storage: Arc::clone(&file_storage),
364            vector_store_prov: Arc::clone(&vector_store_prov),
365            metrics: Arc::clone(&metrics),
366            enqueuer: Arc::clone(&outbox_enqueuer),
367        }));
368
369        // ── Services ────────────────────────────────────────────────────────
370
371        let services = Arc::new(AppServices::new(
372            &repos,
373            db,
374            authz,
375            &(model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>),
376            &provider_resolver,
377            cfg.streaming,
378            model_policy_gw.clone() as Arc<dyn crate::domain::repos::PolicySnapshotProvider>,
379            model_policy_gw as Arc<dyn crate::domain::repos::UserLimitsProvider>,
380            cfg.estimation_budgets,
381            cfg.quota,
382            &(outbox_enqueuer as Arc<dyn crate::domain::repos::OutboxEnqueuer>),
383            cfg.context,
384            file_storage,
385            vector_store_prov,
386            cfg.rag,
387            cfg.thumbnail,
388            metrics,
389        ));
390
391        self.service
392            .set(services)
393            .map_err(|_| anyhow::anyhow!("{} module already initialized", Self::MODULE_NAME))?;
394
395        self.worker_configs
396            .set(WorkerConfigs {
397                orphan_watchdog: cfg.orphan_watchdog,
398            })
399            .map_err(|_| anyhow::anyhow!("{} worker_configs already set", Self::MODULE_NAME))?;
400
401        info!("{} module initialized successfully", Self::MODULE_NAME);
402        Ok(())
403    }
404}
405
406impl DatabaseCapability for MiniChatModule {
407    fn migrations(&self) -> Vec<Box<dyn MigrationTrait>> {
408        use sea_orm_migration::MigratorTrait;
409        info!("Providing mini-chat database migrations");
410        let mut m = crate::infra::db::migrations::Migrator::migrations();
411        m.extend(modkit_db::outbox::outbox_migrations());
412        m
413    }
414}
415
416impl RestApiCapability for MiniChatModule {
417    fn register_rest(
418        &self,
419        _ctx: &ModuleCtx,
420        router: axum::Router,
421        openapi: &dyn OpenApiRegistry,
422    ) -> anyhow::Result<axum::Router> {
423        let services = self
424            .service
425            .get()
426            .ok_or_else(|| anyhow::anyhow!("{} not initialized", Self::MODULE_NAME))?;
427
428        info!("Registering mini-chat REST routes");
429        let prefix = self
430            .url_prefix
431            .get()
432            .ok_or_else(|| anyhow::anyhow!("{} not initialized (url_prefix)", Self::MODULE_NAME))?;
433
434        let router = routes::register_routes(router, openapi, Arc::clone(services), prefix);
435        info!("Mini-chat REST routes registered successfully");
436        Ok(router)
437    }
438}
439
440#[async_trait]
441impl RunnableCapability for MiniChatModule {
442    async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
443        let wc = self.worker_configs.get().ok_or_else(|| {
444            anyhow::anyhow!(
445                "{} worker_configs not set - init() must run before start()",
446                Self::MODULE_NAME
447            )
448        })?;
449        let leader_elector = background_workers::prepare_worker_runtime(wc).await?;
450
451        // Register OAGW upstreams now that GTS is in ready mode (post_init
452        // has completed). During init() this fails because types-registry
453        // list() only queries the persistent store which is empty until
454        // switch_to_ready().
455        if let Some(deferred) = self.oagw_deferred.get() {
456            let ctx =
457                exchange_client_credentials(&deferred.authn, &deferred.client_credentials).await?;
458            let mut providers = deferred.providers.clone();
459            crate::infra::oagw_provisioning::register_oagw_upstreams(
460                &deferred.gateway,
461                &ctx,
462                &mut providers,
463            )
464            .await?;
465        }
466
467        // Start the outbox pipeline now that OAGW upstreams are registered.
468        // Cleanup handlers can immediately call provider DELETE via OAGW.
469        if let Some(od) = self.outbox_deferred.get() {
470            let outbox_db = od.db.db();
471            let num_partitions = od.outbox_config.num_partitions;
472            let max_cleanup_attempts = od.cleanup_config.max_attempts;
473
474            let partitions = Partitions::of(
475                u16::try_from(num_partitions)
476                    .map_err(|_| anyhow::anyhow!("num_partitions exceeds u16"))?,
477            );
478
479            let outbox_handle = Outbox::builder(outbox_db)
480                .queue(&od.outbox_config.queue_name, partitions)
481                .leased(UsageEventHandler {
482                    plugin_provider: od.model_policy_gw.clone(),
483                })
484                .queue(&od.outbox_config.cleanup_queue_name, partitions)
485                .leased(
486                    crate::infra::workers::cleanup_worker::AttachmentCleanupHandler::new(
487                        Arc::clone(&od.file_storage),
488                        Arc::clone(&od.db),
489                        ChatRepository::new(modkit_db::odata::LimitCfg {
490                            default: 20,
491                            max: 100,
492                        }),
493                        max_cleanup_attempts,
494                        Arc::clone(&od.metrics),
495                    ),
496                )
497                .queue(&od.outbox_config.chat_cleanup_queue_name, partitions)
498                .leased(
499                    crate::infra::workers::cleanup_worker::ChatCleanupHandler::new(
500                        Arc::clone(&od.file_storage),
501                        Arc::clone(&od.vector_store_prov),
502                        Arc::clone(&od.db),
503                        ChatRepository::new(modkit_db::odata::LimitCfg {
504                            default: 20,
505                            max: 100,
506                        }),
507                        max_cleanup_attempts,
508                        Arc::clone(&od.metrics),
509                    ),
510                )
511                .queue(&od.outbox_config.thread_summary_queue_name, partitions)
512                .leased(crate::infra::workers::thread_summary_worker::ThreadSummaryHandler)
513                .queue(&od.outbox_config.audit_queue_name, partitions)
514                .leased(AuditEventHandler {
515                    audit_gateway: Arc::clone(&od.audit_gateway),
516                    metrics: Arc::clone(&od.metrics),
517                })
518                .lease(LeaseConfig {
519                    duration: Duration::from_secs(60),
520                    ..LeaseConfig::default()
521                })
522                .start()
523                .await
524                .map_err(|e| anyhow::anyhow!("outbox start: {e}"))?;
525
526            // Wire the outbox handle into the lazy enqueuer.
527            od.enqueuer.set_outbox(Arc::clone(outbox_handle.outbox()));
528
529            let mut guard = self
530                .outbox_handle
531                .lock()
532                .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?;
533            *guard = Some(outbox_handle);
534
535            info!("Outbox pipeline started (OAGW ready)");
536        }
537
538        let orphan_deps = if wc.orphan_watchdog.enabled {
539            let services = self.service.get().ok_or_else(|| {
540                anyhow::anyhow!(
541                    "{} not initialized - init() must run before start()",
542                    Self::MODULE_NAME
543                )
544            })?;
545            Some(crate::infra::workers::orphan_watchdog::OrphanWatchdogDeps {
546                finalization_svc: Arc::clone(&services.finalization),
547                turn_repo: Arc::clone(&services.turn_repo),
548                db: Arc::clone(&services.db),
549                metrics: Arc::clone(&services.metrics),
550            })
551        } else {
552            None
553        };
554
555        let (handles, worker_cancel) =
556            background_workers::spawn_workers(wc, &cancel, leader_elector.as_ref(), orphan_deps)?;
557        self.store_worker_runtime(handles, worker_cancel).await?;
558
559        Ok(())
560    }
561
562    async fn stop(&self, cancel: CancellationToken) -> anyhow::Result<()> {
563        if let Some(worker_cancel) = self
564            .worker_cancel
565            .lock()
566            .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?
567            .take()
568        {
569            worker_cancel.cancel();
570        }
571
572        let workers = self
573            .worker_handles
574            .lock()
575            .map_err(|e| anyhow::anyhow!("worker_handles lock: {e}"))?
576            .take();
577        if let Some(handles) = workers {
578            info!("Waiting for background workers to stop");
579            handles.join_all(cancel.clone(), WORKER_STOP_TIMEOUT).await;
580            info!("Background workers stopped");
581        }
582
583        let handle = self
584            .outbox_handle
585            .lock()
586            .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?
587            .take();
588        if let Some(handle) = handle {
589            info!("Stopping outbox pipeline");
590            tokio::select! {
591                () = handle.stop() => {
592                    info!("Outbox pipeline stopped");
593                }
594                () = cancel.cancelled() => {
595                    info!("Outbox pipeline stop cancelled by framework deadline");
596                }
597            }
598        }
599        Ok(())
600    }
601}
602
603impl MiniChatModule {
604    async fn store_worker_runtime(
605        &self,
606        handles: WorkerHandles,
607        worker_cancel: CancellationToken,
608    ) -> anyhow::Result<()> {
609        let worker_cancel_cleanup = worker_cancel.clone();
610
611        // Store cancel token. Guard must not live across an await point.
612        let cancel_already_set = {
613            let mut guard = self
614                .worker_cancel
615                .lock()
616                .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?;
617            if guard.is_some() {
618                true
619            } else {
620                *guard = Some(worker_cancel);
621                false
622            }
623            // guard dropped here — before any await
624        };
625        if cancel_already_set {
626            worker_cancel_cleanup.cancel();
627            let hard_stop = CancellationToken::new();
628            hard_stop.cancel();
629            handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
630            anyhow::bail!("{} worker_cancel already set", Self::MODULE_NAME);
631        }
632
633        // Store handles. Guard must not live across an await point.
634        let mut handles = Some(handles);
635        let handles_err = {
636            match self.worker_handles.lock() {
637                Ok(mut guard) => {
638                    if guard.is_some() {
639                        Some("worker_handles already set".to_owned())
640                    } else {
641                        *guard = handles.take();
642                        None
643                    }
644                }
645                Err(e) => Some(format!("worker_handles lock: {e}")),
646            }
647            // guard dropped here — before any await
648        };
649        if let Some(msg) = handles_err {
650            if let Ok(mut cancel_guard) = self.worker_cancel.lock() {
651                cancel_guard.take();
652            }
653            worker_cancel_cleanup.cancel();
654            if let Some(handles) = handles {
655                let hard_stop = CancellationToken::new();
656                hard_stop.cancel();
657                handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
658            }
659            // handles was either moved into the mutex (not the error case)
660            // or never stored. In the "already set" case it was moved, so
661            // we rely on the cancel token to stop workers; their JoinHandles
662            // will be cleaned up when the existing WorkerHandles is joined
663            // in stop().
664            anyhow::bail!("{} {msg}", Self::MODULE_NAME);
665        }
666        Ok(())
667    }
668}
669
670/// Exchange `OAuth2` client credentials via the `AuthN` resolver to obtain
671/// a `SecurityContext` for OAGW upstream provisioning.
672async fn exchange_client_credentials(
673    authn: &Arc<dyn AuthNResolverClient>,
674    creds: &crate::config::ClientCredentialsConfig,
675) -> anyhow::Result<modkit_security::SecurityContext> {
676    info!("Exchanging client credentials for OAGW provisioning context");
677    let request = ClientCredentialsRequest {
678        client_id: creds.client_id.clone(),
679        client_secret: creds.client_secret.clone(),
680        scopes: Vec::new(),
681    };
682    let result = authn
683        .exchange_client_credentials(&request)
684        .await
685        .map_err(|e| anyhow::anyhow!("client credentials exchange failed: {e}"))?;
686    info!("Security context obtained for OAGW provisioning");
687    Ok(result.security_context)
688}
689
690async fn register_plugin_schemas(
691    registry: &dyn TypesRegistryClient,
692    schemas: &[(String, &str, &str)],
693) -> anyhow::Result<()> {
694    let mut payload = Vec::with_capacity(schemas.len());
695    for (schema_str, schema_id, _label) in schemas {
696        let mut schema_json: serde_json::Value = serde_json::from_str(schema_str)?;
697        let obj = schema_json
698            .as_object_mut()
699            .ok_or_else(|| anyhow::anyhow!("schema {schema_id} is not a JSON object"))?;
700        obj.insert(
701            "additionalProperties".to_owned(),
702            serde_json::Value::Bool(false),
703        );
704        payload.push(schema_json);
705    }
706    let results = registry.register(payload).await?;
707    RegisterResult::ensure_all_ok(&results)?;
708    for (_schema_str, schema_id, label) in schemas {
709        info!(schema_id = %schema_id, "Registered {label} plugin schema in types-registry");
710    }
711    Ok(())
712}