1use std::sync::{Arc, Mutex, OnceLock};
2
3use async_trait::async_trait;
4use authn_resolver_sdk::{AuthNResolverClient, ClientCredentialsRequest};
5use authz_resolver_sdk::AuthZResolverClient;
6use mini_chat_sdk::{MiniChatAuditPluginSpecV1, MiniChatModelPolicyPluginSpecV1};
7use modkit::api::OpenApiRegistry;
8use modkit::contracts::RunnableCapability;
9use modkit::{DatabaseCapability, Module, ModuleCtx, RestApiCapability};
10use std::time::Duration;
11
12use modkit_db::outbox::{LeaseConfig, Outbox, OutboxHandle, Partitions};
13use oagw_sdk::ServiceGatewayClientV1;
14use sea_orm_migration::MigrationTrait;
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17use types_registry_sdk::{RegisterResult, TypesRegistryClient};
18
19use crate::api::rest::routes;
20use crate::background_workers::{self, WORKER_STOP_TIMEOUT, WorkerConfigs};
21use crate::config::ProviderEntry;
22use crate::domain::ports::MiniChatMetricsPort;
23use crate::domain::service::{AppServices as GenericAppServices, Repositories};
24use crate::infra::metrics::MiniChatMetricsMeter;
25use crate::infra::outbox::{AuditEventHandler, InfraOutboxEnqueuer, UsageEventHandler};
26use crate::infra::workers::WorkerHandles;
27
28pub(crate) type AppServices = GenericAppServices<
29 TurnRepository,
30 MessageRepository,
31 QuotaUsageRepository,
32 ReactionRepository,
33 ChatRepository,
34 ThreadSummaryRepository,
35 AttachmentRepository,
36 VectorStoreRepository,
37 MessageAttachmentRepository,
38>;
39use crate::infra::audit_gateway::AuditGateway;
40use crate::infra::db::repo::attachment_repo::AttachmentRepository;
41use crate::infra::db::repo::chat_repo::ChatRepository;
42use crate::infra::db::repo::message_attachment_repo::MessageAttachmentRepository;
43use crate::infra::db::repo::message_repo::MessageRepository;
44use crate::infra::db::repo::quota_usage_repo::QuotaUsageRepository;
45use crate::infra::db::repo::reaction_repo::ReactionRepository;
46use crate::infra::db::repo::thread_summary_repo::ThreadSummaryRepository;
47use crate::infra::db::repo::turn_repo::TurnRepository;
48use crate::infra::db::repo::vector_store_repo::VectorStoreRepository;
49use crate::infra::llm::provider_resolver::ProviderResolver;
50use crate::infra::model_policy::ModelPolicyGateway;
51
52pub const DEFAULT_URL_PREFIX: &str = "/mini-chat";
54
55#[modkit::module(
57 name = "mini-chat",
58 deps = ["types-registry", "authn-resolver", "authz-resolver", "oagw"],
59 capabilities = [db, rest, stateful],
60)]
61pub struct MiniChatModule {
62 service: OnceLock<Arc<AppServices>>,
63 url_prefix: OnceLock<String>,
64 outbox_handle: Mutex<Option<OutboxHandle>>,
65 oagw_deferred: OnceLock<OagwDeferred>,
67 worker_configs: OnceLock<WorkerConfigs>,
69 worker_cancel: Mutex<Option<CancellationToken>>,
70 worker_handles: Mutex<Option<WorkerHandles>>,
72 outbox_deferred: OnceLock<OutboxDeferred>,
74}
75
76struct OagwDeferred {
78 gateway: Arc<dyn ServiceGatewayClientV1>,
79 authn: Arc<dyn AuthNResolverClient>,
80 client_credentials: crate::config::ClientCredentialsConfig,
81 providers: std::collections::HashMap<String, ProviderEntry>,
82}
83
84struct OutboxDeferred {
87 db: Arc<crate::domain::service::DbProvider>,
88 outbox_config: crate::config::OutboxConfig,
89 cleanup_config: crate::config::background::CleanupWorkerConfig,
90 model_policy_gw: Arc<ModelPolicyGateway>,
91 audit_gateway: Arc<AuditGateway>,
92 file_storage: Arc<dyn crate::domain::ports::FileStorageProvider>,
93 vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider>,
94 metrics: Arc<dyn MiniChatMetricsPort>,
95 enqueuer: Arc<InfraOutboxEnqueuer>,
96 provider_resolver: Arc<crate::infra::llm::provider_resolver::ProviderResolver>,
97 model_resolver: Arc<dyn crate::domain::repos::ModelResolver>,
98 thread_summary_config: crate::config::background::ThreadSummaryWorkerConfig,
99}
100
101impl Default for MiniChatModule {
102 fn default() -> Self {
103 Self {
104 service: OnceLock::new(),
105 url_prefix: OnceLock::new(),
106 outbox_handle: Mutex::new(None),
107 oagw_deferred: OnceLock::new(),
108 worker_configs: OnceLock::new(),
109 worker_cancel: Mutex::new(None),
110 worker_handles: Mutex::new(None),
111 outbox_deferred: OnceLock::new(),
112 }
113 }
114}
115
116#[allow(clippy::too_many_lines)]
117#[async_trait]
118impl Module for MiniChatModule {
119 async fn init(&self, ctx: &ModuleCtx) -> anyhow::Result<()> {
120 info!("Initializing {} module", Self::MODULE_NAME);
121
122 let mut cfg: crate::config::MiniChatConfig = ctx.config_expanded_or_default()?;
123 cfg.streaming
124 .validate()
125 .map_err(|e| anyhow::anyhow!("streaming config: {e}"))?;
126 cfg.estimation_budgets
127 .validate()
128 .map_err(|e| anyhow::anyhow!("estimation_budgets config: {e}"))?;
129 cfg.quota
130 .validate()
131 .map_err(|e| anyhow::anyhow!("quota config: {e}"))?;
132 cfg.outbox
133 .validate()
134 .map_err(|e| anyhow::anyhow!("outbox config: {e}"))?;
135 cfg.context
136 .validate()
137 .map_err(|e| anyhow::anyhow!("context config: {e}"))?;
138 cfg.client_credentials
139 .validate()
140 .map_err(|e| anyhow::anyhow!("client_credentials config: {e}"))?;
141 for (id, entry) in &cfg.providers {
142 entry
143 .validate(id)
144 .map_err(|e| anyhow::anyhow!("providers config: {e}"))?;
145 }
146 cfg.orphan_watchdog
147 .validate()
148 .map_err(|e| anyhow::anyhow!("orphan_watchdog config: {e}"))?;
149 cfg.thread_summary_worker
150 .validate()
151 .map_err(|e| anyhow::anyhow!("thread_summary_worker config: {e}"))?;
152 cfg.cleanup_worker
153 .validate()
154 .map_err(|e| anyhow::anyhow!("cleanup_worker config: {e}"))?;
155 cfg.thumbnail
156 .validate()
157 .map_err(|e| anyhow::anyhow!("thumbnail config: {e}"))?;
158 cfg.rag
159 .validate()
160 .map_err(|e| anyhow::anyhow!("rag config: {e}"))?;
161
162 let vendor = cfg.vendor.trim().to_owned();
163 if vendor.is_empty() {
164 return Err(anyhow::anyhow!(
165 "{}: vendor must be a non-empty string",
166 Self::MODULE_NAME
167 ));
168 }
169
170 let registry = ctx.client_hub().get::<dyn TypesRegistryClient>()?;
171 register_plugin_schemas(
172 &*registry,
173 &[
174 (
175 MiniChatModelPolicyPluginSpecV1::gts_schema_with_refs_as_string(),
176 MiniChatModelPolicyPluginSpecV1::gts_schema_id(),
177 "model-policy",
178 ),
179 (
180 MiniChatAuditPluginSpecV1::gts_schema_with_refs_as_string(),
181 MiniChatAuditPluginSpecV1::gts_schema_id(),
182 "audit",
183 ),
184 ],
185 )
186 .await?;
187
188 self.url_prefix
189 .set(cfg.url_prefix)
190 .map_err(|_| anyhow::anyhow!("{} url_prefix already set", Self::MODULE_NAME))?;
191
192 let db_provider = ctx.db_required()?;
193 let db = Arc::new(db_provider);
194
195 let model_policy_gw = Arc::new(ModelPolicyGateway::new(ctx.client_hub(), vendor.clone()));
197
198 let audit_gateway = Arc::new(AuditGateway::new(ctx.client_hub(), vendor));
200
201 let authz = ctx
204 .client_hub()
205 .get::<dyn AuthZResolverClient>()
206 .map_err(|e| anyhow::anyhow!("failed to get AuthZ resolver: {e}"))?;
207
208 let authn_client = ctx
209 .client_hub()
210 .get::<dyn AuthNResolverClient>()
211 .map_err(|e| anyhow::anyhow!("failed to get AuthN resolver: {e}"))?;
212
213 let gateway = ctx
214 .client_hub()
215 .get::<dyn ServiceGatewayClientV1>()
216 .map_err(|e| anyhow::anyhow!("failed to get OAGW gateway: {e}"))?;
217
218 for entry in cfg.providers.values_mut() {
223 if entry.upstream_alias.is_none() {
224 entry.upstream_alias = Some(entry.host.clone());
225 }
226 for ovr in entry.tenant_overrides.values_mut() {
227 if ovr.upstream_alias.is_none()
228 && let Some(ref h) = ovr.host
229 {
230 ovr.upstream_alias = Some(h.clone());
231 }
232 }
233 }
234
235 drop(self.oagw_deferred.set(OagwDeferred {
238 gateway: Arc::clone(&gateway),
239 authn: Arc::clone(&authn_client),
240 client_credentials: cfg.client_credentials.clone(),
241 providers: cfg.providers.clone(),
242 }));
243
244 let provider_resolver = Arc::new(ProviderResolver::new(&gateway, cfg.providers));
245
246 let repos = Repositories {
247 chat: Arc::new(ChatRepository::new(modkit_db::odata::LimitCfg {
248 default: 20,
249 max: 100,
250 })),
251 attachment: Arc::new(AttachmentRepository),
252 message: Arc::new(MessageRepository::new(modkit_db::odata::LimitCfg {
253 default: 20,
254 max: 100,
255 })),
256 quota: Arc::new(QuotaUsageRepository),
257 turn: Arc::new(TurnRepository),
258 reaction: Arc::new(ReactionRepository),
259 thread_summary: Arc::new(ThreadSummaryRepository),
260 vector_store: Arc::new(VectorStoreRepository),
261 message_attachment: Arc::new(MessageAttachmentRepository),
262 };
263
264 let rag_client = Arc::new(
265 crate::infra::llm::providers::rag_http_client::RagHttpClient::new(Arc::clone(&gateway)),
266 );
267
268 let mut file_impls: std::collections::HashMap<
271 String,
272 Arc<dyn crate::domain::ports::FileStorageProvider>,
273 > = std::collections::HashMap::new();
274 let mut vs_impls: std::collections::HashMap<
275 String,
276 Arc<dyn crate::domain::ports::VectorStoreProvider>,
277 > = std::collections::HashMap::new();
278 for (provider_id, entry) in provider_resolver.entries() {
279 let (file, vs): (
280 Arc<dyn crate::domain::ports::FileStorageProvider>,
281 Arc<dyn crate::domain::ports::VectorStoreProvider>,
282 ) = match entry.storage_kind {
283 crate::config::StorageKind::Azure => {
284 let api_version = entry.api_version.clone().unwrap_or_else(|| {
285 panic!(
286 "provider '{provider_id}': storage_kind is 'azure' \
287 but api_version is not set"
288 )
289 });
290 (
291 Arc::new(
292 crate::infra::llm::providers::azure_file_storage::AzureFileStorage::new(
293 Arc::clone(&rag_client),
294 Arc::clone(&provider_resolver),
295 api_version.clone(),
296 ),
297 ),
298 Arc::new(
299 crate::infra::llm::providers::azure_vector_store::AzureVectorStore::new(
300 Arc::clone(&rag_client),
301 Arc::clone(&provider_resolver),
302 api_version,
303 ),
304 ),
305 )
306 }
307 crate::config::StorageKind::OpenAi => (
308 Arc::new(
309 crate::infra::llm::providers::openai_file_storage::OpenAiFileStorage::new(
310 Arc::clone(&rag_client),
311 Arc::clone(&provider_resolver),
312 ),
313 ),
314 Arc::new(
315 crate::infra::llm::providers::openai_vector_store::OpenAiVectorStore::new(
316 Arc::clone(&rag_client),
317 Arc::clone(&provider_resolver),
318 ),
319 ),
320 ),
321 };
322 file_impls.insert(provider_id.clone(), file);
323 vs_impls.insert(provider_id.clone(), vs);
324 }
325 let file_storage: Arc<dyn crate::domain::ports::FileStorageProvider> = Arc::new(
326 crate::infra::llm::providers::dispatching_storage::DispatchingFileStorage::new(
327 file_impls,
328 ),
329 );
330 let vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider> = Arc::new(
331 crate::infra::llm::providers::dispatching_storage::DispatchingVectorStore::new(
332 vs_impls,
333 ),
334 );
335
336 let metrics_prefix = cfg.metrics.effective_prefix(Self::MODULE_NAME);
339 let scope =
340 opentelemetry::InstrumentationScope::builder(Self::MODULE_NAME.to_owned()).build();
341 let metrics: Arc<dyn MiniChatMetricsPort> = Arc::new(MiniChatMetricsMeter::new(
342 &opentelemetry::global::meter_with_scope(scope),
343 &metrics_prefix,
344 ));
345
346 let outbox_enqueuer = Arc::new(InfraOutboxEnqueuer::new(
354 cfg.outbox.queue_name.clone(),
355 cfg.outbox.cleanup_queue_name.clone(),
356 cfg.outbox.chat_cleanup_queue_name.clone(),
357 cfg.outbox.thread_summary_queue_name.clone(),
358 cfg.outbox.audit_queue_name.clone(),
359 cfg.outbox.num_partitions,
360 ));
361
362 drop(self.outbox_deferred.set(OutboxDeferred {
364 db: Arc::clone(&db),
365 outbox_config: cfg.outbox,
366 cleanup_config: cfg.cleanup_worker,
367 model_policy_gw: model_policy_gw.clone(),
368 audit_gateway: Arc::clone(&audit_gateway),
369 file_storage: Arc::clone(&file_storage),
370 vector_store_prov: Arc::clone(&vector_store_prov),
371 metrics: Arc::clone(&metrics),
372 enqueuer: Arc::clone(&outbox_enqueuer),
373 provider_resolver: Arc::clone(&provider_resolver),
374 model_resolver: model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>,
375 thread_summary_config: cfg.thread_summary_worker.clone(),
376 }));
377
378 let services = Arc::new(AppServices::new(
381 &repos,
382 db,
383 authz,
384 &(model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>),
385 &provider_resolver,
386 cfg.streaming,
387 model_policy_gw.clone() as Arc<dyn crate::domain::repos::PolicySnapshotProvider>,
388 model_policy_gw as Arc<dyn crate::domain::repos::UserLimitsProvider>,
389 cfg.estimation_budgets,
390 cfg.quota,
391 &(outbox_enqueuer as Arc<dyn crate::domain::repos::OutboxEnqueuer>),
392 cfg.context,
393 file_storage,
394 vector_store_prov,
395 cfg.rag,
396 cfg.thumbnail,
397 metrics,
398 cfg.thread_summary_worker,
399 ));
400
401 self.service
402 .set(services)
403 .map_err(|_| anyhow::anyhow!("{} module already initialized", Self::MODULE_NAME))?;
404
405 self.worker_configs
406 .set(WorkerConfigs {
407 orphan_watchdog: cfg.orphan_watchdog,
408 })
409 .map_err(|_| anyhow::anyhow!("{} worker_configs already set", Self::MODULE_NAME))?;
410
411 info!("{} module initialized successfully", Self::MODULE_NAME);
412 Ok(())
413 }
414}
415
416impl DatabaseCapability for MiniChatModule {
417 fn migrations(&self) -> Vec<Box<dyn MigrationTrait>> {
418 use sea_orm_migration::MigratorTrait;
419 info!("Providing mini-chat database migrations");
420 let mut m = crate::infra::db::migrations::Migrator::migrations();
421 m.extend(modkit_db::outbox::outbox_migrations());
422 m
423 }
424}
425
426impl RestApiCapability for MiniChatModule {
427 fn register_rest(
428 &self,
429 _ctx: &ModuleCtx,
430 router: axum::Router,
431 openapi: &dyn OpenApiRegistry,
432 ) -> anyhow::Result<axum::Router> {
433 let services = self
434 .service
435 .get()
436 .ok_or_else(|| anyhow::anyhow!("{} not initialized", Self::MODULE_NAME))?;
437
438 info!("Registering mini-chat REST routes");
439 let prefix = self
440 .url_prefix
441 .get()
442 .ok_or_else(|| anyhow::anyhow!("{} not initialized (url_prefix)", Self::MODULE_NAME))?;
443
444 let router = routes::register_routes(router, openapi, Arc::clone(services), prefix);
445 info!("Mini-chat REST routes registered successfully");
446 Ok(router)
447 }
448}
449
450#[async_trait]
451impl RunnableCapability for MiniChatModule {
452 async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
453 let wc = self.worker_configs.get().ok_or_else(|| {
454 anyhow::anyhow!(
455 "{} worker_configs not set - init() must run before start()",
456 Self::MODULE_NAME
457 )
458 })?;
459 let leader_elector = background_workers::prepare_worker_runtime(wc).await?;
460
461 if let Some(deferred) = self.oagw_deferred.get() {
466 let ctx =
467 exchange_client_credentials(&deferred.authn, &deferred.client_credentials).await?;
468 let mut providers = deferred.providers.clone();
469 crate::infra::oagw_provisioning::register_oagw_upstreams(
470 &deferred.gateway,
471 &ctx,
472 &mut providers,
473 )
474 .await?;
475 }
476
477 if let Some(od) = self.outbox_deferred.get() {
480 let outbox_db = od.db.db();
481 let num_partitions = od.outbox_config.num_partitions;
482 let max_cleanup_attempts = od.cleanup_config.max_attempts;
483
484 let partitions = Partitions::of(
485 u16::try_from(num_partitions)
486 .map_err(|_| anyhow::anyhow!("num_partitions exceeds u16"))?,
487 );
488
489 let outbox_handle = Outbox::builder(outbox_db)
490 .queue(&od.outbox_config.queue_name, partitions)
491 .leased(UsageEventHandler {
492 plugin_provider: od.model_policy_gw.clone(),
493 })
494 .queue(&od.outbox_config.cleanup_queue_name, partitions)
495 .leased(
496 crate::infra::workers::cleanup_worker::AttachmentCleanupHandler::new(
497 Arc::clone(&od.file_storage),
498 Arc::clone(&od.db),
499 ChatRepository::new(modkit_db::odata::LimitCfg {
500 default: 20,
501 max: 100,
502 }),
503 max_cleanup_attempts,
504 Arc::clone(&od.metrics),
505 ),
506 )
507 .queue(&od.outbox_config.chat_cleanup_queue_name, partitions)
508 .leased(
509 crate::infra::workers::cleanup_worker::ChatCleanupHandler::new(
510 Arc::clone(&od.file_storage),
511 Arc::clone(&od.vector_store_prov),
512 Arc::clone(&od.db),
513 ChatRepository::new(modkit_db::odata::LimitCfg {
514 default: 20,
515 max: 100,
516 }),
517 max_cleanup_attempts,
518 Arc::clone(&od.metrics),
519 ),
520 )
521 .queue(&od.outbox_config.thread_summary_queue_name, partitions)
522 .leased(
523 crate::infra::workers::thread_summary_worker::ThreadSummaryHandler::new(
524 Arc::new(
525 crate::infra::workers::thread_summary_worker::ThreadSummaryDeps {
526 db: Arc::clone(&od.db),
527 thread_summary_repo: Arc::new(ThreadSummaryRepository),
528 message_repo: Arc::new(MessageRepository::new(
529 modkit_db::odata::LimitCfg {
530 default: 20,
531 max: 100,
532 },
533 )),
534 outbox_enqueuer: Arc::clone(&od.enqueuer)
535 as Arc<dyn crate::domain::repos::OutboxEnqueuer>,
536 metrics: Arc::clone(&od.metrics),
537 provider_resolver: Arc::clone(&od.provider_resolver),
538 model_resolver: Arc::clone(&od.model_resolver),
539 config: od.thread_summary_config.clone(),
540 },
541 ),
542 ),
543 )
544 .queue(&od.outbox_config.audit_queue_name, partitions)
545 .leased(AuditEventHandler {
546 audit_gateway: Arc::clone(&od.audit_gateway),
547 metrics: Arc::clone(&od.metrics),
548 })
549 .lease(LeaseConfig {
550 duration: Duration::from_mins(1),
551 ..LeaseConfig::default()
552 })
553 .start()
554 .await
555 .map_err(|e| anyhow::anyhow!("outbox start: {e}"))?;
556
557 od.enqueuer.set_outbox(Arc::clone(outbox_handle.outbox()));
559
560 let mut guard = self
561 .outbox_handle
562 .lock()
563 .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?;
564 *guard = Some(outbox_handle);
565
566 info!("Outbox pipeline started (OAGW ready)");
567 }
568
569 let orphan_deps = if wc.orphan_watchdog.enabled {
570 let services = self.service.get().ok_or_else(|| {
571 anyhow::anyhow!(
572 "{} not initialized - init() must run before start()",
573 Self::MODULE_NAME
574 )
575 })?;
576 Some(crate::infra::workers::orphan_watchdog::OrphanWatchdogDeps {
577 finalization_svc: Arc::clone(&services.finalization),
578 turn_repo: Arc::clone(&services.turn_repo),
579 db: Arc::clone(&services.db),
580 metrics: Arc::clone(&services.metrics),
581 })
582 } else {
583 None
584 };
585
586 let (handles, worker_cancel) =
587 background_workers::spawn_workers(wc, &cancel, leader_elector.as_ref(), orphan_deps)?;
588 self.store_worker_runtime(handles, worker_cancel).await?;
589
590 Ok(())
591 }
592
593 async fn stop(&self, cancel: CancellationToken) -> anyhow::Result<()> {
594 if let Some(worker_cancel) = self
595 .worker_cancel
596 .lock()
597 .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?
598 .take()
599 {
600 worker_cancel.cancel();
601 }
602
603 let workers = self
604 .worker_handles
605 .lock()
606 .map_err(|e| anyhow::anyhow!("worker_handles lock: {e}"))?
607 .take();
608 if let Some(handles) = workers {
609 info!("Waiting for background workers to stop");
610 handles.join_all(cancel.clone(), WORKER_STOP_TIMEOUT).await;
611 info!("Background workers stopped");
612 }
613
614 let handle = self
615 .outbox_handle
616 .lock()
617 .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?
618 .take();
619 if let Some(handle) = handle {
620 info!("Stopping outbox pipeline");
621 tokio::select! {
622 () = handle.stop() => {
623 info!("Outbox pipeline stopped");
624 }
625 () = cancel.cancelled() => {
626 info!("Outbox pipeline stop cancelled by framework deadline");
627 }
628 }
629 }
630 Ok(())
631 }
632}
633
634impl MiniChatModule {
635 async fn store_worker_runtime(
636 &self,
637 handles: WorkerHandles,
638 worker_cancel: CancellationToken,
639 ) -> anyhow::Result<()> {
640 let worker_cancel_cleanup = worker_cancel.clone();
641
642 let cancel_already_set = {
644 let mut guard = self
645 .worker_cancel
646 .lock()
647 .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?;
648 if guard.is_some() {
649 true
650 } else {
651 *guard = Some(worker_cancel);
652 false
653 }
654 };
656 if cancel_already_set {
657 worker_cancel_cleanup.cancel();
658 let hard_stop = CancellationToken::new();
659 hard_stop.cancel();
660 handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
661 anyhow::bail!("{} worker_cancel already set", Self::MODULE_NAME);
662 }
663
664 let mut handles = Some(handles);
666 let handles_err = {
667 match self.worker_handles.lock() {
668 Ok(mut guard) => {
669 if guard.is_some() {
670 Some("worker_handles already set".to_owned())
671 } else {
672 *guard = handles.take();
673 None
674 }
675 }
676 Err(e) => Some(format!("worker_handles lock: {e}")),
677 }
678 };
680 if let Some(msg) = handles_err {
681 if let Ok(mut cancel_guard) = self.worker_cancel.lock() {
682 cancel_guard.take();
683 }
684 worker_cancel_cleanup.cancel();
685 if let Some(handles) = handles {
686 let hard_stop = CancellationToken::new();
687 hard_stop.cancel();
688 handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
689 }
690 anyhow::bail!("{} {msg}", Self::MODULE_NAME);
696 }
697 Ok(())
698 }
699}
700
701async fn exchange_client_credentials(
704 authn: &Arc<dyn AuthNResolverClient>,
705 creds: &crate::config::ClientCredentialsConfig,
706) -> anyhow::Result<modkit_security::SecurityContext> {
707 info!("Exchanging client credentials for OAGW provisioning context");
708 let request = ClientCredentialsRequest {
709 client_id: creds.client_id.clone(),
710 client_secret: creds.client_secret.clone(),
711 scopes: Vec::new(),
712 };
713 let result = authn
714 .exchange_client_credentials(&request)
715 .await
716 .map_err(|e| anyhow::anyhow!("client credentials exchange failed: {e}"))?;
717 info!("Security context obtained for OAGW provisioning");
718 Ok(result.security_context)
719}
720
721async fn register_plugin_schemas(
722 registry: &dyn TypesRegistryClient,
723 schemas: &[(String, &str, &str)],
724) -> anyhow::Result<()> {
725 let mut payload = Vec::with_capacity(schemas.len());
726 for (schema_str, schema_id, _label) in schemas {
727 let mut schema_json: serde_json::Value = serde_json::from_str(schema_str)?;
728 let obj = schema_json
729 .as_object_mut()
730 .ok_or_else(|| anyhow::anyhow!("schema {schema_id} is not a JSON object"))?;
731 obj.insert(
732 "additionalProperties".to_owned(),
733 serde_json::Value::Bool(false),
734 );
735 payload.push(schema_json);
736 }
737 let results = registry.register(payload).await?;
738 RegisterResult::ensure_all_ok(&results)?;
739 for (_schema_str, schema_id, label) in schemas {
740 info!(schema_id = %schema_id, "Registered {label} plugin schema in types-registry");
741 }
742 Ok(())
743}