helios_persistence/backends/mongodb/
backend.rs1use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use mongodb::{Client, Database, bson::doc, options::ClientOptions};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use tokio::sync::OnceCell;
13
14use helios_fhir::FhirVersion;
15
16use crate::core::{Backend, BackendCapability, BackendKind};
17use crate::error::{BackendError, StorageError, StorageResult};
18use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
19
20use super::schema;
21
22pub(crate) async fn connect_client(config: &MongoBackendConfig) -> StorageResult<Client> {
27 let mut client_options = ClientOptions::parse(&config.connection_string)
28 .await
29 .map_err(|e| {
30 StorageError::Backend(BackendError::ConnectionFailed {
31 backend_name: "mongodb".to_string(),
32 message: e.to_string(),
33 })
34 })?;
35
36 client_options.max_pool_size = Some(config.max_connections);
37 client_options.connect_timeout = Some(Duration::from_millis(config.connect_timeout_ms));
38 client_options.app_name = Some("helios-persistence".to_string());
39
40 client_options.server_selection_timeout = Some(SERVER_SELECTION_TIMEOUT);
44 client_options.max_idle_time = Some(MAX_CONNECTION_IDLE_TIME);
47
48 Client::with_options(client_options).map_err(|e| {
49 StorageError::Backend(BackendError::Internal {
50 backend_name: "mongodb".to_string(),
51 message: format!("Failed to create MongoDB client: {}", e),
52 source: None,
53 })
54 })
55}
56
57const SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(15);
59
60const MAX_CONNECTION_IDLE_TIME: Duration = Duration::from_secs(60);
63
64pub struct MongoBackend {
73 config: MongoBackendConfig,
74 client: Arc<OnceCell<Client>>,
79 search_registry: Arc<RwLock<SearchParameterRegistry>>,
81 search_extractor: Arc<SearchParameterExtractor>,
83}
84
85impl Debug for MongoBackend {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("MongoBackend")
88 .field("config", &self.config)
89 .field("search_registry_len", &self.search_registry.read().len())
90 .finish_non_exhaustive()
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct MongoBackendConfig {
97 #[serde(default = "default_connection_string")]
99 pub connection_string: String,
100
101 #[serde(default = "default_database_name")]
103 pub database_name: String,
104
105 #[serde(default = "default_max_connections")]
107 pub max_connections: u32,
108
109 #[serde(default = "default_connect_timeout_ms")]
111 pub connect_timeout_ms: u64,
112
113 #[serde(default = "crate::default_fhir_version")]
115 pub fhir_version: FhirVersion,
116
117 #[serde(default)]
119 pub data_dir: Option<PathBuf>,
120
121 #[serde(default)]
123 pub search_offloaded: bool,
124}
125
126fn default_connection_string() -> String {
127 "mongodb://localhost:27017".to_string()
128}
129
130fn default_database_name() -> String {
131 "helios".to_string()
132}
133
134fn default_max_connections() -> u32 {
135 10
136}
137
138fn default_connect_timeout_ms() -> u64 {
139 5000
140}
141
142impl Default for MongoBackendConfig {
143 fn default() -> Self {
144 Self {
145 connection_string: default_connection_string(),
146 database_name: default_database_name(),
147 max_connections: default_max_connections(),
148 connect_timeout_ms: default_connect_timeout_ms(),
149 fhir_version: FhirVersion::default_enabled(),
150 data_dir: None,
151 search_offloaded: false,
152 }
153 }
154}
155
156impl MongoBackend {
157 pub(crate) const RESOURCES_COLLECTION: &'static str = "resources";
158 pub(crate) const RESOURCE_HISTORY_COLLECTION: &'static str = "resource_history";
159 pub(crate) const SEARCH_INDEX_COLLECTION: &'static str = "search_index";
160
161 pub fn new(config: MongoBackendConfig) -> StorageResult<Self> {
163 Self::validate_connection_string(&config.connection_string)?;
164
165 let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
166 Self::initialize_search_registry(&search_registry, &config);
167 let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
168
169 Ok(Self {
170 config,
171 client: Arc::new(OnceCell::new()),
172 search_registry,
173 search_extractor,
174 })
175 }
176
177 pub fn from_connection_string(connection_string: impl Into<String>) -> StorageResult<Self> {
179 let config = MongoBackendConfig {
180 connection_string: connection_string.into(),
181 ..Default::default()
182 };
183 Self::new(config)
184 }
185
186 pub fn from_env() -> StorageResult<Self> {
196 let connection_string = std::env::var("HFS_MONGODB_URL")
197 .or_else(|_| std::env::var("HFS_MONGODB_URI"))
198 .or_else(|_| std::env::var("HFS_DATABASE_URL"))
199 .unwrap_or_else(|_| default_connection_string());
200
201 let database_name =
202 std::env::var("HFS_MONGODB_DATABASE").unwrap_or_else(|_| default_database_name());
203
204 let max_connections = std::env::var("HFS_MONGODB_MAX_CONNECTIONS")
205 .ok()
206 .and_then(|v| v.parse::<u32>().ok())
207 .unwrap_or_else(default_max_connections);
208
209 let connect_timeout_ms = std::env::var("HFS_MONGODB_CONNECT_TIMEOUT_MS")
210 .ok()
211 .and_then(|v| v.parse::<u64>().ok())
212 .unwrap_or_else(default_connect_timeout_ms);
213
214 let config = MongoBackendConfig {
215 connection_string,
216 database_name,
217 max_connections,
218 connect_timeout_ms,
219 ..Default::default()
220 };
221
222 Self::new(config)
223 }
224
225 fn validate_connection_string(connection_string: &str) -> StorageResult<()> {
226 let uri = connection_string.trim();
227 if uri.is_empty() {
228 return Err(StorageError::Backend(BackendError::ConnectionFailed {
229 backend_name: "mongodb".to_string(),
230 message: "MongoDB connection string cannot be empty".to_string(),
231 }));
232 }
233
234 if !Self::looks_like_mongodb_uri(uri) {
235 tracing::warn!(
236 uri = %uri,
237 "MongoDB connection string does not start with mongodb:// or mongodb+srv://"
238 );
239 }
240
241 Ok(())
242 }
243
244 fn looks_like_mongodb_uri(connection_string: &str) -> bool {
245 connection_string.starts_with("mongodb://")
246 || connection_string.starts_with("mongodb+srv://")
247 }
248
249 fn initialize_search_registry(
250 registry: &Arc<RwLock<SearchParameterRegistry>>,
251 config: &MongoBackendConfig,
252 ) {
253 let loader = SearchParameterLoader::new(config.fhir_version);
254 let mut reg = registry.write();
255
256 let mut fallback_count = 0;
257 let mut spec_count = 0;
258 let mut spec_file: Option<PathBuf> = None;
259 let mut custom_count = 0;
260 let mut custom_files: Vec<String> = Vec::new();
261
262 match loader.load_embedded() {
264 Ok(params) => {
265 for param in params {
266 if reg.register(param).is_ok() {
267 fallback_count += 1;
268 }
269 }
270 }
271 Err(e) => {
272 tracing::error!("Failed to load embedded SearchParameters: {}", e);
273 }
274 }
275
276 let data_dir = config
278 .data_dir
279 .clone()
280 .unwrap_or_else(|| PathBuf::from("./data"));
281 let spec_filename = loader.spec_filename();
282 let spec_path = data_dir.join(spec_filename);
283 match loader.load_from_spec_file(&data_dir) {
284 Ok(params) => {
285 for param in params {
286 if reg.register(param).is_ok() {
287 spec_count += 1;
288 }
289 }
290 if spec_count > 0 {
291 spec_file = Some(spec_path);
292 }
293 }
294 Err(e) => {
295 tracing::warn!(
296 "Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
297 spec_path.display(),
298 e
299 );
300 }
301 }
302
303 match loader.load_custom_from_directory_with_files(&data_dir) {
305 Ok((params, files)) => {
306 for param in params {
307 if reg.register(param).is_ok() {
308 custom_count += 1;
309 }
310 }
311 custom_files = files;
312 }
313 Err(e) => {
314 tracing::warn!(
315 "Error loading custom SearchParameters from {}: {}",
316 data_dir.display(),
317 e
318 );
319 }
320 }
321
322 let resource_type_count = reg.resource_types().len();
323 let spec_info = spec_file
324 .map(|p| format!(" from {}", p.display()))
325 .unwrap_or_default();
326 let custom_info = if custom_files.is_empty() {
327 String::new()
328 } else {
329 format!(" [{}]", custom_files.join(", "))
330 };
331
332 tracing::info!(
333 "MongoDB SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
334 reg.len(),
335 spec_count,
336 spec_info,
337 fallback_count,
338 custom_count,
339 custom_info,
340 resource_type_count
341 );
342 }
343
344 pub async fn init_schema(&self) -> StorageResult<()> {
346 let db = self.get_database().await?;
347 schema::initialize_schema_async(&db).await
348 }
349
350 pub(crate) async fn get_client(&self) -> StorageResult<Client> {
353 self.client
354 .get_or_try_init(|| connect_client(&self.config))
355 .await
356 .cloned()
357 }
358
359 pub(crate) fn client_cell(&self) -> Arc<OnceCell<Client>> {
363 Arc::clone(&self.client)
364 }
365
366 pub(crate) async fn get_database(&self) -> StorageResult<Database> {
368 let client = self.get_client().await?;
369 Ok(client.database(&self.config.database_name))
370 }
371
372 pub fn config(&self) -> &MongoBackendConfig {
374 &self.config
375 }
376
377 pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
379 &self.search_registry
380 }
381
382 pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
384 &self.search_extractor
385 }
386
387 pub fn is_search_offloaded(&self) -> bool {
389 self.config.search_offloaded
390 }
391
392 pub fn set_search_offloaded(&mut self, offloaded: bool) {
394 self.config.search_offloaded = offloaded;
395 }
396}
397
398#[derive(Clone)]
400pub struct MongoConnection {
401 pub(crate) database: Database,
402}
403
404impl Debug for MongoConnection {
405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 f.debug_struct("MongoConnection")
407 .field("database", &self.database.name())
408 .finish_non_exhaustive()
409 }
410}
411
412#[async_trait]
413impl Backend for MongoBackend {
414 type Connection = MongoConnection;
415
416 fn kind(&self) -> BackendKind {
417 BackendKind::MongoDB
418 }
419
420 fn name(&self) -> &'static str {
421 "mongodb"
422 }
423
424 fn supports(&self, capability: BackendCapability) -> bool {
425 matches!(
426 capability,
427 BackendCapability::Crud
428 | BackendCapability::Versioning
429 | BackendCapability::InstanceHistory
430 | BackendCapability::TypeHistory
431 | BackendCapability::SystemHistory
432 | BackendCapability::BasicSearch
433 | BackendCapability::DateSearch
434 | BackendCapability::ReferenceSearch
435 | BackendCapability::Sorting
436 | BackendCapability::OffsetPagination
437 | BackendCapability::CursorPagination
438 | BackendCapability::Transactions
439 | BackendCapability::OptimisticLocking
440 | BackendCapability::SharedSchema
441 )
442 }
443
444 fn capabilities(&self) -> Vec<BackendCapability> {
445 vec![
446 BackendCapability::Crud,
447 BackendCapability::Versioning,
448 BackendCapability::InstanceHistory,
449 BackendCapability::TypeHistory,
450 BackendCapability::SystemHistory,
451 BackendCapability::BasicSearch,
452 BackendCapability::DateSearch,
453 BackendCapability::ReferenceSearch,
454 BackendCapability::Sorting,
455 BackendCapability::OffsetPagination,
456 BackendCapability::CursorPagination,
457 BackendCapability::Transactions,
458 BackendCapability::OptimisticLocking,
459 BackendCapability::SharedSchema,
460 ]
461 }
462
463 async fn acquire(&self) -> Result<Self::Connection, BackendError> {
464 let client = self
465 .get_client()
466 .await
467 .map_err(|e| BackendError::ConnectionFailed {
468 backend_name: "mongodb".to_string(),
469 message: e.to_string(),
470 })?;
471 let database = client.database(&self.config.database_name);
472 Ok(MongoConnection { database })
473 }
474
475 async fn release(&self, _conn: Self::Connection) {
476 }
478
479 async fn health_check(&self) -> Result<(), BackendError> {
480 if !Self::looks_like_mongodb_uri(&self.config.connection_string) {
481 return Err(BackendError::Unavailable {
482 backend_name: "mongodb".to_string(),
483 message: "Invalid MongoDB connection string format".to_string(),
484 });
485 }
486
487 let db = self
488 .get_database()
489 .await
490 .map_err(|e| BackendError::Unavailable {
491 backend_name: "mongodb".to_string(),
492 message: format!("Unable to create database handle: {}", e),
493 })?;
494
495 db.run_command(doc! { "ping": 1_i32 })
496 .await
497 .map_err(|e| BackendError::Unavailable {
498 backend_name: "mongodb".to_string(),
499 message: format!("Health check failed: {}", e),
500 })?;
501
502 Ok(())
503 }
504
505 async fn initialize(&self) -> Result<(), BackendError> {
506 self.init_schema()
507 .await
508 .map_err(|e| BackendError::Internal {
509 backend_name: "mongodb".to_string(),
510 message: format!("Failed to initialize schema: {}", e),
511 source: None,
512 })
513 }
514
515 async fn migrate(&self) -> Result<(), BackendError> {
516 let db = self
517 .get_database()
518 .await
519 .map_err(|e| BackendError::Internal {
520 backend_name: "mongodb".to_string(),
521 message: format!("Failed to acquire database for migration: {}", e),
522 source: None,
523 })?;
524
525 schema::migrate_schema_async(&db)
526 .await
527 .map_err(|e| BackendError::Internal {
528 backend_name: "mongodb".to_string(),
529 message: format!("Failed to run migrations: {}", e),
530 source: None,
531 })
532 }
533}
534
535use crate::core::capabilities::{
540 GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
541};
542use crate::types::{
543 IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
544 SearchParamType, SpecialSearchParam,
545};
546
547impl MongoBackend {
548 pub(super) fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
554 match param_type {
555 SearchParamType::String => vec!["exact", "contains", "text"],
556 SearchParamType::Token => vec!["text", "code-text"],
557 SearchParamType::Reference => vec!["contains", "text", "code-text"],
558 SearchParamType::Uri => vec!["exact", "contains"],
559 SearchParamType::Date
560 | SearchParamType::Number
561 | SearchParamType::Quantity
562 | SearchParamType::Composite
563 | SearchParamType::Special => vec![],
564 }
565 }
566}
567
568impl SearchCapabilityProvider for MongoBackend {
569 fn resource_search_capabilities(
570 &self,
571 resource_type: &str,
572 ) -> Option<ResourceSearchCapabilities> {
573 let params = {
574 let registry = self.search_registry.read();
575 registry.get_active_params(resource_type)
576 };
577 let common_params = {
578 let registry = self.search_registry.read();
579 registry.get_active_params("Resource")
580 };
581 if params.is_empty() && common_params.is_empty() {
582 return None;
583 }
584
585 let mut search_params = Vec::new();
586 for param in ¶ms {
587 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
588 .with_definition(¶m.url)
589 .with_modifiers(Self::modifiers_for_type(param.param_type));
590 if let Some(ref targets) = param.target {
591 cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
592 }
593 search_params.push(cap);
594 }
595 for param in &common_params {
596 if !search_params.iter().any(|p| p.name == param.code) {
597 search_params.push(
598 SearchParamFullCapability::new(¶m.code, param.param_type)
599 .with_definition(¶m.url)
600 .with_modifiers(Self::modifiers_for_type(param.param_type)),
601 );
602 }
603 }
604
605 Some(
606 ResourceSearchCapabilities::new(resource_type)
607 .with_special_params(vec![SpecialSearchParam::Id])
608 .with_include_capabilities(vec![
609 IncludeCapability::Include,
610 IncludeCapability::Revinclude,
611 ])
612 .with_pagination_capabilities(vec![
613 PaginationCapability::Count,
614 PaginationCapability::Offset,
615 PaginationCapability::MaxPageSize(1000),
616 PaginationCapability::DefaultPageSize(20),
617 ])
618 .with_result_mode_capabilities(vec![ResultModeCapability::Total])
619 .with_param_list(search_params),
620 )
621 }
622
623 fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
624 GlobalSearchCapabilities::new().with_special_params(vec![SpecialSearchParam::Id])
625 }
626}
627
628#[cfg(test)]
629mod capability_tests {
630 use super::*;
631
632 #[test]
633 fn test_modifiers_for_type_reflects_mongo_support() {
634 let s = MongoBackend::modifiers_for_type(SearchParamType::String);
636 assert!(s.contains(&"exact"));
637 assert!(s.contains(&"text"));
638 assert!(!s.contains(&"missing"));
639
640 let t = MongoBackend::modifiers_for_type(SearchParamType::Token);
642 assert!(!t.contains(&"code"));
643 assert!(t.contains(&"code-text"));
644 assert!(!t.contains(&"not"));
645 assert!(!t.contains(&"of-type"));
646
647 let r = MongoBackend::modifiers_for_type(SearchParamType::Reference);
649 assert!(r.contains(&"contains"));
650 assert!(r.contains(&"text"));
651 assert!(!r.contains(&"identifier"));
652
653 let u = MongoBackend::modifiers_for_type(SearchParamType::Uri);
655 assert!(u.contains(&"contains"));
656 assert!(!u.contains(&"above"));
657 assert!(!u.contains(&"below"));
658 }
659}