1use std::sync::{Arc, Mutex, OnceLock};
2
3use async_trait::async_trait;
4use authn_resolver_sdk::{AuthNResolverClient, ClientCredentialsRequest};
5use authz_resolver_sdk::AuthZResolverClient;
6use mini_chat_sdk::{MiniChatAuditPluginSpecV1, MiniChatModelPolicyPluginSpecV1};
7use modkit::api::OpenApiRegistry;
8use modkit::contracts::RunnableCapability;
9use modkit::{DatabaseCapability, Module, ModuleCtx, RestApiCapability};
10use std::time::Duration;
11
12use modkit_db::outbox::{LeaseConfig, Outbox, OutboxHandle, Partitions};
13use oagw_sdk::ServiceGatewayClientV1;
14use sea_orm_migration::MigrationTrait;
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17use types_registry_sdk::{RegisterResult, TypesRegistryClient};
18
19use crate::api::rest::routes;
20use crate::background_workers::{self, WORKER_STOP_TIMEOUT, WorkerConfigs};
21use crate::config::ProviderEntry;
22use crate::domain::ports::MiniChatMetricsPort;
23use crate::domain::service::{AppServices as GenericAppServices, Repositories};
24use crate::infra::metrics::MiniChatMetricsMeter;
25use crate::infra::outbox::{AuditEventHandler, InfraOutboxEnqueuer, UsageEventHandler};
26use crate::infra::workers::WorkerHandles;
27
28pub(crate) type AppServices = GenericAppServices<
29 TurnRepository,
30 MessageRepository,
31 QuotaUsageRepository,
32 ReactionRepository,
33 ChatRepository,
34 ThreadSummaryRepository,
35 AttachmentRepository,
36 VectorStoreRepository,
37 MessageAttachmentRepository,
38>;
39use crate::infra::audit_gateway::AuditGateway;
40use crate::infra::db::repo::attachment_repo::AttachmentRepository;
41use crate::infra::db::repo::chat_repo::ChatRepository;
42use crate::infra::db::repo::message_attachment_repo::MessageAttachmentRepository;
43use crate::infra::db::repo::message_repo::MessageRepository;
44use crate::infra::db::repo::quota_usage_repo::QuotaUsageRepository;
45use crate::infra::db::repo::reaction_repo::ReactionRepository;
46use crate::infra::db::repo::thread_summary_repo::ThreadSummaryRepository;
47use crate::infra::db::repo::turn_repo::TurnRepository;
48use crate::infra::db::repo::vector_store_repo::VectorStoreRepository;
49use crate::infra::llm::provider_resolver::ProviderResolver;
50use crate::infra::model_policy::ModelPolicyGateway;
51
52pub const DEFAULT_URL_PREFIX: &str = "/mini-chat";
54
55#[modkit::module(
57 name = "mini-chat",
58 deps = ["types-registry", "authn-resolver", "authz-resolver", "oagw"],
59 capabilities = [db, rest, stateful],
60)]
61pub struct MiniChatModule {
62 service: OnceLock<Arc<AppServices>>,
63 url_prefix: OnceLock<String>,
64 outbox_handle: Mutex<Option<OutboxHandle>>,
65 oagw_deferred: OnceLock<OagwDeferred>,
67 worker_configs: OnceLock<WorkerConfigs>,
69 worker_cancel: Mutex<Option<CancellationToken>>,
70 worker_handles: Mutex<Option<WorkerHandles>>,
72 outbox_deferred: OnceLock<OutboxDeferred>,
74}
75
76struct OagwDeferred {
78 gateway: Arc<dyn ServiceGatewayClientV1>,
79 authn: Arc<dyn AuthNResolverClient>,
80 client_credentials: crate::config::ClientCredentialsConfig,
81 providers: std::collections::HashMap<String, ProviderEntry>,
82}
83
84struct OutboxDeferred {
87 db: Arc<crate::domain::service::DbProvider>,
88 outbox_config: crate::config::OutboxConfig,
89 cleanup_config: crate::config::background::CleanupWorkerConfig,
90 model_policy_gw: Arc<ModelPolicyGateway>,
91 audit_gateway: Arc<AuditGateway>,
92 file_storage: Arc<dyn crate::domain::ports::FileStorageProvider>,
93 vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider>,
94 metrics: Arc<dyn MiniChatMetricsPort>,
95 enqueuer: Arc<InfraOutboxEnqueuer>,
96}
97
98impl Default for MiniChatModule {
99 fn default() -> Self {
100 Self {
101 service: OnceLock::new(),
102 url_prefix: OnceLock::new(),
103 outbox_handle: Mutex::new(None),
104 oagw_deferred: OnceLock::new(),
105 worker_configs: OnceLock::new(),
106 worker_cancel: Mutex::new(None),
107 worker_handles: Mutex::new(None),
108 outbox_deferred: OnceLock::new(),
109 }
110 }
111}
112
113#[allow(clippy::too_many_lines)]
114#[async_trait]
115impl Module for MiniChatModule {
116 async fn init(&self, ctx: &ModuleCtx) -> anyhow::Result<()> {
117 info!("Initializing {} module", Self::MODULE_NAME);
118
119 let mut cfg: crate::config::MiniChatConfig = ctx.config_expanded()?;
120 cfg.streaming
121 .validate()
122 .map_err(|e| anyhow::anyhow!("streaming config: {e}"))?;
123 cfg.estimation_budgets
124 .validate()
125 .map_err(|e| anyhow::anyhow!("estimation_budgets config: {e}"))?;
126 cfg.quota
127 .validate()
128 .map_err(|e| anyhow::anyhow!("quota config: {e}"))?;
129 cfg.outbox
130 .validate()
131 .map_err(|e| anyhow::anyhow!("outbox config: {e}"))?;
132 cfg.context
133 .validate()
134 .map_err(|e| anyhow::anyhow!("context config: {e}"))?;
135 cfg.client_credentials
136 .validate()
137 .map_err(|e| anyhow::anyhow!("client_credentials config: {e}"))?;
138 for (id, entry) in &cfg.providers {
139 entry
140 .validate(id)
141 .map_err(|e| anyhow::anyhow!("providers config: {e}"))?;
142 }
143 cfg.orphan_watchdog
144 .validate()
145 .map_err(|e| anyhow::anyhow!("orphan_watchdog config: {e}"))?;
146 cfg.thread_summary_worker
147 .validate()
148 .map_err(|e| anyhow::anyhow!("thread_summary_worker config: {e}"))?;
149 cfg.cleanup_worker
150 .validate()
151 .map_err(|e| anyhow::anyhow!("cleanup_worker config: {e}"))?;
152 cfg.thumbnail
153 .validate()
154 .map_err(|e| anyhow::anyhow!("thumbnail config: {e}"))?;
155
156 let vendor = cfg.vendor.trim().to_owned();
157 if vendor.is_empty() {
158 return Err(anyhow::anyhow!(
159 "{}: vendor must be a non-empty string",
160 Self::MODULE_NAME
161 ));
162 }
163
164 let registry = ctx.client_hub().get::<dyn TypesRegistryClient>()?;
165 register_plugin_schemas(
166 &*registry,
167 &[
168 (
169 MiniChatModelPolicyPluginSpecV1::gts_schema_with_refs_as_string(),
170 MiniChatModelPolicyPluginSpecV1::gts_schema_id(),
171 "model-policy",
172 ),
173 (
174 MiniChatAuditPluginSpecV1::gts_schema_with_refs_as_string(),
175 MiniChatAuditPluginSpecV1::gts_schema_id(),
176 "audit",
177 ),
178 ],
179 )
180 .await?;
181
182 self.url_prefix
183 .set(cfg.url_prefix)
184 .map_err(|_| anyhow::anyhow!("{} url_prefix already set", Self::MODULE_NAME))?;
185
186 let db_provider = ctx.db_required()?;
187 let db = Arc::new(db_provider);
188
189 let model_policy_gw = Arc::new(ModelPolicyGateway::new(ctx.client_hub(), vendor.clone()));
191
192 let audit_gateway = Arc::new(AuditGateway::new(ctx.client_hub(), vendor));
194
195 let authz = ctx
198 .client_hub()
199 .get::<dyn AuthZResolverClient>()
200 .map_err(|e| anyhow::anyhow!("failed to get AuthZ resolver: {e}"))?;
201
202 let authn_client = ctx
203 .client_hub()
204 .get::<dyn AuthNResolverClient>()
205 .map_err(|e| anyhow::anyhow!("failed to get AuthN resolver: {e}"))?;
206
207 let gateway = ctx
208 .client_hub()
209 .get::<dyn ServiceGatewayClientV1>()
210 .map_err(|e| anyhow::anyhow!("failed to get OAGW gateway: {e}"))?;
211
212 for entry in cfg.providers.values_mut() {
217 if entry.upstream_alias.is_none() {
218 entry.upstream_alias = Some(entry.host.clone());
219 }
220 for ovr in entry.tenant_overrides.values_mut() {
221 if ovr.upstream_alias.is_none()
222 && let Some(ref h) = ovr.host
223 {
224 ovr.upstream_alias = Some(h.clone());
225 }
226 }
227 }
228
229 drop(self.oagw_deferred.set(OagwDeferred {
232 gateway: Arc::clone(&gateway),
233 authn: Arc::clone(&authn_client),
234 client_credentials: cfg.client_credentials.clone(),
235 providers: cfg.providers.clone(),
236 }));
237
238 let provider_resolver = Arc::new(ProviderResolver::new(&gateway, cfg.providers));
239
240 let repos = Repositories {
241 chat: Arc::new(ChatRepository::new(modkit_db::odata::LimitCfg {
242 default: 20,
243 max: 100,
244 })),
245 attachment: Arc::new(AttachmentRepository),
246 message: Arc::new(MessageRepository::new(modkit_db::odata::LimitCfg {
247 default: 20,
248 max: 100,
249 })),
250 quota: Arc::new(QuotaUsageRepository),
251 turn: Arc::new(TurnRepository),
252 reaction: Arc::new(ReactionRepository),
253 thread_summary: Arc::new(ThreadSummaryRepository),
254 vector_store: Arc::new(VectorStoreRepository),
255 message_attachment: Arc::new(MessageAttachmentRepository),
256 };
257
258 let rag_client = Arc::new(
259 crate::infra::llm::providers::rag_http_client::RagHttpClient::new(Arc::clone(&gateway)),
260 );
261
262 let mut file_impls: std::collections::HashMap<
265 String,
266 Arc<dyn crate::domain::ports::FileStorageProvider>,
267 > = std::collections::HashMap::new();
268 let mut vs_impls: std::collections::HashMap<
269 String,
270 Arc<dyn crate::domain::ports::VectorStoreProvider>,
271 > = std::collections::HashMap::new();
272 for (provider_id, entry) in provider_resolver.entries() {
273 let (file, vs): (
274 Arc<dyn crate::domain::ports::FileStorageProvider>,
275 Arc<dyn crate::domain::ports::VectorStoreProvider>,
276 ) = match entry.storage_kind {
277 crate::config::StorageKind::Azure => {
278 let api_version = entry.api_version.clone().unwrap_or_else(|| {
279 panic!(
280 "provider '{provider_id}': storage_kind is 'azure' \
281 but api_version is not set"
282 )
283 });
284 (
285 Arc::new(
286 crate::infra::llm::providers::azure_file_storage::AzureFileStorage::new(
287 Arc::clone(&rag_client),
288 Arc::clone(&provider_resolver),
289 api_version.clone(),
290 ),
291 ),
292 Arc::new(
293 crate::infra::llm::providers::azure_vector_store::AzureVectorStore::new(
294 Arc::clone(&rag_client),
295 Arc::clone(&provider_resolver),
296 api_version,
297 ),
298 ),
299 )
300 }
301 crate::config::StorageKind::OpenAi => (
302 Arc::new(
303 crate::infra::llm::providers::openai_file_storage::OpenAiFileStorage::new(
304 Arc::clone(&rag_client),
305 Arc::clone(&provider_resolver),
306 ),
307 ),
308 Arc::new(
309 crate::infra::llm::providers::openai_vector_store::OpenAiVectorStore::new(
310 Arc::clone(&rag_client),
311 Arc::clone(&provider_resolver),
312 ),
313 ),
314 ),
315 };
316 file_impls.insert(provider_id.clone(), file);
317 vs_impls.insert(provider_id.clone(), vs);
318 }
319 let file_storage: Arc<dyn crate::domain::ports::FileStorageProvider> = Arc::new(
320 crate::infra::llm::providers::dispatching_storage::DispatchingFileStorage::new(
321 file_impls,
322 ),
323 );
324 let vector_store_prov: Arc<dyn crate::domain::ports::VectorStoreProvider> = Arc::new(
325 crate::infra::llm::providers::dispatching_storage::DispatchingVectorStore::new(
326 vs_impls,
327 ),
328 );
329
330 let metrics_prefix = cfg.metrics.effective_prefix(Self::MODULE_NAME);
333 let scope =
334 opentelemetry::InstrumentationScope::builder(Self::MODULE_NAME.to_owned()).build();
335 let metrics: Arc<dyn MiniChatMetricsPort> = Arc::new(MiniChatMetricsMeter::new(
336 &opentelemetry::global::meter_with_scope(scope),
337 &metrics_prefix,
338 ));
339
340 let outbox_enqueuer = Arc::new(InfraOutboxEnqueuer::new(
348 cfg.outbox.queue_name.clone(),
349 cfg.outbox.cleanup_queue_name.clone(),
350 cfg.outbox.chat_cleanup_queue_name.clone(),
351 cfg.outbox.thread_summary_queue_name.clone(),
352 cfg.outbox.audit_queue_name.clone(),
353 cfg.outbox.num_partitions,
354 ));
355
356 drop(self.outbox_deferred.set(OutboxDeferred {
358 db: Arc::clone(&db),
359 outbox_config: cfg.outbox,
360 cleanup_config: cfg.cleanup_worker,
361 model_policy_gw: model_policy_gw.clone(),
362 audit_gateway: Arc::clone(&audit_gateway),
363 file_storage: Arc::clone(&file_storage),
364 vector_store_prov: Arc::clone(&vector_store_prov),
365 metrics: Arc::clone(&metrics),
366 enqueuer: Arc::clone(&outbox_enqueuer),
367 }));
368
369 let services = Arc::new(AppServices::new(
372 &repos,
373 db,
374 authz,
375 &(model_policy_gw.clone() as Arc<dyn crate::domain::repos::ModelResolver>),
376 &provider_resolver,
377 cfg.streaming,
378 model_policy_gw.clone() as Arc<dyn crate::domain::repos::PolicySnapshotProvider>,
379 model_policy_gw as Arc<dyn crate::domain::repos::UserLimitsProvider>,
380 cfg.estimation_budgets,
381 cfg.quota,
382 &(outbox_enqueuer as Arc<dyn crate::domain::repos::OutboxEnqueuer>),
383 cfg.context,
384 file_storage,
385 vector_store_prov,
386 cfg.rag,
387 cfg.thumbnail,
388 metrics,
389 ));
390
391 self.service
392 .set(services)
393 .map_err(|_| anyhow::anyhow!("{} module already initialized", Self::MODULE_NAME))?;
394
395 self.worker_configs
396 .set(WorkerConfigs {
397 orphan_watchdog: cfg.orphan_watchdog,
398 })
399 .map_err(|_| anyhow::anyhow!("{} worker_configs already set", Self::MODULE_NAME))?;
400
401 info!("{} module initialized successfully", Self::MODULE_NAME);
402 Ok(())
403 }
404}
405
406impl DatabaseCapability for MiniChatModule {
407 fn migrations(&self) -> Vec<Box<dyn MigrationTrait>> {
408 use sea_orm_migration::MigratorTrait;
409 info!("Providing mini-chat database migrations");
410 let mut m = crate::infra::db::migrations::Migrator::migrations();
411 m.extend(modkit_db::outbox::outbox_migrations());
412 m
413 }
414}
415
416impl RestApiCapability for MiniChatModule {
417 fn register_rest(
418 &self,
419 _ctx: &ModuleCtx,
420 router: axum::Router,
421 openapi: &dyn OpenApiRegistry,
422 ) -> anyhow::Result<axum::Router> {
423 let services = self
424 .service
425 .get()
426 .ok_or_else(|| anyhow::anyhow!("{} not initialized", Self::MODULE_NAME))?;
427
428 info!("Registering mini-chat REST routes");
429 let prefix = self
430 .url_prefix
431 .get()
432 .ok_or_else(|| anyhow::anyhow!("{} not initialized (url_prefix)", Self::MODULE_NAME))?;
433
434 let router = routes::register_routes(router, openapi, Arc::clone(services), prefix);
435 info!("Mini-chat REST routes registered successfully");
436 Ok(router)
437 }
438}
439
440#[async_trait]
441impl RunnableCapability for MiniChatModule {
442 async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
443 let wc = self.worker_configs.get().ok_or_else(|| {
444 anyhow::anyhow!(
445 "{} worker_configs not set - init() must run before start()",
446 Self::MODULE_NAME
447 )
448 })?;
449 let leader_elector = background_workers::prepare_worker_runtime(wc).await?;
450
451 if let Some(deferred) = self.oagw_deferred.get() {
456 let ctx =
457 exchange_client_credentials(&deferred.authn, &deferred.client_credentials).await?;
458 let mut providers = deferred.providers.clone();
459 crate::infra::oagw_provisioning::register_oagw_upstreams(
460 &deferred.gateway,
461 &ctx,
462 &mut providers,
463 )
464 .await?;
465 }
466
467 if let Some(od) = self.outbox_deferred.get() {
470 let outbox_db = od.db.db();
471 let num_partitions = od.outbox_config.num_partitions;
472 let max_cleanup_attempts = od.cleanup_config.max_attempts;
473
474 let partitions = Partitions::of(
475 u16::try_from(num_partitions)
476 .map_err(|_| anyhow::anyhow!("num_partitions exceeds u16"))?,
477 );
478
479 let outbox_handle = Outbox::builder(outbox_db)
480 .queue(&od.outbox_config.queue_name, partitions)
481 .leased(UsageEventHandler {
482 plugin_provider: od.model_policy_gw.clone(),
483 })
484 .queue(&od.outbox_config.cleanup_queue_name, partitions)
485 .leased(
486 crate::infra::workers::cleanup_worker::AttachmentCleanupHandler::new(
487 Arc::clone(&od.file_storage),
488 Arc::clone(&od.db),
489 ChatRepository::new(modkit_db::odata::LimitCfg {
490 default: 20,
491 max: 100,
492 }),
493 max_cleanup_attempts,
494 Arc::clone(&od.metrics),
495 ),
496 )
497 .queue(&od.outbox_config.chat_cleanup_queue_name, partitions)
498 .leased(
499 crate::infra::workers::cleanup_worker::ChatCleanupHandler::new(
500 Arc::clone(&od.file_storage),
501 Arc::clone(&od.vector_store_prov),
502 Arc::clone(&od.db),
503 ChatRepository::new(modkit_db::odata::LimitCfg {
504 default: 20,
505 max: 100,
506 }),
507 max_cleanup_attempts,
508 Arc::clone(&od.metrics),
509 ),
510 )
511 .queue(&od.outbox_config.thread_summary_queue_name, partitions)
512 .leased(crate::infra::workers::thread_summary_worker::ThreadSummaryHandler)
513 .queue(&od.outbox_config.audit_queue_name, partitions)
514 .leased(AuditEventHandler {
515 audit_gateway: Arc::clone(&od.audit_gateway),
516 metrics: Arc::clone(&od.metrics),
517 })
518 .lease(LeaseConfig {
519 duration: Duration::from_secs(60),
520 ..LeaseConfig::default()
521 })
522 .start()
523 .await
524 .map_err(|e| anyhow::anyhow!("outbox start: {e}"))?;
525
526 od.enqueuer.set_outbox(Arc::clone(outbox_handle.outbox()));
528
529 let mut guard = self
530 .outbox_handle
531 .lock()
532 .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?;
533 *guard = Some(outbox_handle);
534
535 info!("Outbox pipeline started (OAGW ready)");
536 }
537
538 let orphan_deps = if wc.orphan_watchdog.enabled {
539 let services = self.service.get().ok_or_else(|| {
540 anyhow::anyhow!(
541 "{} not initialized - init() must run before start()",
542 Self::MODULE_NAME
543 )
544 })?;
545 Some(crate::infra::workers::orphan_watchdog::OrphanWatchdogDeps {
546 finalization_svc: Arc::clone(&services.finalization),
547 turn_repo: Arc::clone(&services.turn_repo),
548 db: Arc::clone(&services.db),
549 metrics: Arc::clone(&services.metrics),
550 })
551 } else {
552 None
553 };
554
555 let (handles, worker_cancel) =
556 background_workers::spawn_workers(wc, &cancel, leader_elector.as_ref(), orphan_deps)?;
557 self.store_worker_runtime(handles, worker_cancel).await?;
558
559 Ok(())
560 }
561
562 async fn stop(&self, cancel: CancellationToken) -> anyhow::Result<()> {
563 if let Some(worker_cancel) = self
564 .worker_cancel
565 .lock()
566 .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?
567 .take()
568 {
569 worker_cancel.cancel();
570 }
571
572 let workers = self
573 .worker_handles
574 .lock()
575 .map_err(|e| anyhow::anyhow!("worker_handles lock: {e}"))?
576 .take();
577 if let Some(handles) = workers {
578 info!("Waiting for background workers to stop");
579 handles.join_all(cancel.clone(), WORKER_STOP_TIMEOUT).await;
580 info!("Background workers stopped");
581 }
582
583 let handle = self
584 .outbox_handle
585 .lock()
586 .map_err(|e| anyhow::anyhow!("outbox_handle lock: {e}"))?
587 .take();
588 if let Some(handle) = handle {
589 info!("Stopping outbox pipeline");
590 tokio::select! {
591 () = handle.stop() => {
592 info!("Outbox pipeline stopped");
593 }
594 () = cancel.cancelled() => {
595 info!("Outbox pipeline stop cancelled by framework deadline");
596 }
597 }
598 }
599 Ok(())
600 }
601}
602
603impl MiniChatModule {
604 async fn store_worker_runtime(
605 &self,
606 handles: WorkerHandles,
607 worker_cancel: CancellationToken,
608 ) -> anyhow::Result<()> {
609 let worker_cancel_cleanup = worker_cancel.clone();
610
611 let cancel_already_set = {
613 let mut guard = self
614 .worker_cancel
615 .lock()
616 .map_err(|e| anyhow::anyhow!("worker_cancel lock: {e}"))?;
617 if guard.is_some() {
618 true
619 } else {
620 *guard = Some(worker_cancel);
621 false
622 }
623 };
625 if cancel_already_set {
626 worker_cancel_cleanup.cancel();
627 let hard_stop = CancellationToken::new();
628 hard_stop.cancel();
629 handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
630 anyhow::bail!("{} worker_cancel already set", Self::MODULE_NAME);
631 }
632
633 let mut handles = Some(handles);
635 let handles_err = {
636 match self.worker_handles.lock() {
637 Ok(mut guard) => {
638 if guard.is_some() {
639 Some("worker_handles already set".to_owned())
640 } else {
641 *guard = handles.take();
642 None
643 }
644 }
645 Err(e) => Some(format!("worker_handles lock: {e}")),
646 }
647 };
649 if let Some(msg) = handles_err {
650 if let Ok(mut cancel_guard) = self.worker_cancel.lock() {
651 cancel_guard.take();
652 }
653 worker_cancel_cleanup.cancel();
654 if let Some(handles) = handles {
655 let hard_stop = CancellationToken::new();
656 hard_stop.cancel();
657 handles.join_all(hard_stop, WORKER_STOP_TIMEOUT).await;
658 }
659 anyhow::bail!("{} {msg}", Self::MODULE_NAME);
665 }
666 Ok(())
667 }
668}
669
670async fn exchange_client_credentials(
673 authn: &Arc<dyn AuthNResolverClient>,
674 creds: &crate::config::ClientCredentialsConfig,
675) -> anyhow::Result<modkit_security::SecurityContext> {
676 info!("Exchanging client credentials for OAGW provisioning context");
677 let request = ClientCredentialsRequest {
678 client_id: creds.client_id.clone(),
679 client_secret: creds.client_secret.clone(),
680 scopes: Vec::new(),
681 };
682 let result = authn
683 .exchange_client_credentials(&request)
684 .await
685 .map_err(|e| anyhow::anyhow!("client credentials exchange failed: {e}"))?;
686 info!("Security context obtained for OAGW provisioning");
687 Ok(result.security_context)
688}
689
690async fn register_plugin_schemas(
691 registry: &dyn TypesRegistryClient,
692 schemas: &[(String, &str, &str)],
693) -> anyhow::Result<()> {
694 let mut payload = Vec::with_capacity(schemas.len());
695 for (schema_str, schema_id, _label) in schemas {
696 let mut schema_json: serde_json::Value = serde_json::from_str(schema_str)?;
697 let obj = schema_json
698 .as_object_mut()
699 .ok_or_else(|| anyhow::anyhow!("schema {schema_id} is not a JSON object"))?;
700 obj.insert(
701 "additionalProperties".to_owned(),
702 serde_json::Value::Bool(false),
703 );
704 payload.push(schema_json);
705 }
706 let results = registry.register(payload).await?;
707 RegisterResult::ensure_all_ok(&results)?;
708 for (_schema_str, schema_id, label) in schemas {
709 info!(schema_id = %schema_id, "Registered {label} plugin schema in types-registry");
710 }
711 Ok(())
712}