Skip to main content

helios_persistence/backends/mongodb/
backend.rs

1//! MongoDB backend implementation.
2
3use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use mongodb::{Client, Database, bson::doc, options::ClientOptions};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use tokio::sync::OnceCell;
13
14use helios_fhir::FhirVersion;
15
16use crate::core::{Backend, BackendCapability, BackendKind};
17use crate::error::{BackendError, StorageError, StorageResult};
18use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
19
20use super::schema;
21
22/// Builds a MongoDB client from a backend configuration.
23///
24/// Standalone (rather than a `&self` method) so both [`MongoBackend`] and the
25/// in-DB SOF runner can initialise the shared client cell from a cloned config.
26pub(crate) async fn connect_client(config: &MongoBackendConfig) -> StorageResult<Client> {
27    let mut client_options = ClientOptions::parse(&config.connection_string)
28        .await
29        .map_err(|e| {
30            StorageError::Backend(BackendError::ConnectionFailed {
31                backend_name: "mongodb".to_string(),
32                message: e.to_string(),
33            })
34        })?;
35
36    client_options.max_pool_size = Some(config.max_connections);
37    client_options.connect_timeout = Some(Duration::from_millis(config.connect_timeout_ms));
38    client_options.app_name = Some("helios-persistence".to_string());
39
40    // Fail fast when no healthy server can be selected. `connect_timeout`
41    // only covers new TCP handshakes; this caps requests once server
42    // monitoring has marked the server unavailable.
43    client_options.server_selection_timeout = Some(SERVER_SELECTION_TIMEOUT);
44    // Recycle idle connections so stale ones (e.g. silently dropped by a
45    // NAT/firewall on a long-lived network path) are not handed out.
46    client_options.max_idle_time = Some(MAX_CONNECTION_IDLE_TIME);
47
48    Client::with_options(client_options).map_err(|e| {
49        StorageError::Backend(BackendError::Internal {
50            backend_name: "mongodb".to_string(),
51            message: format!("Failed to create MongoDB client: {}", e),
52            source: None,
53        })
54    })
55}
56
57/// Upper bound for selecting a usable server before an operation gives up.
58const SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(15);
59
60/// Connections idle longer than this are closed rather than reused, so a
61/// connection silently dropped by a NAT/firewall is never handed to a request.
62const MAX_CONNECTION_IDLE_TIME: Duration = Duration::from_secs(60);
63
64/// MongoDB backend for FHIR resource storage.
65///
66/// The Phase 4 implementation provides backend wiring, schema bootstrap,
67/// core ResourceStorage behavior for CRUD/count + tenant isolation,
68/// [`crate::core::VersionedStorage`] support, and history providers.
69///
70/// Basic search and conditional create/update/delete are available.
71/// Advanced search/composite behavior remains in later phases.
72pub struct MongoBackend {
73    config: MongoBackendConfig,
74    /// Lazily initialized MongoDB client. MongoDB clients own their connection
75    /// pools, so each backend instance must reuse one client. Wrapped in an
76    /// `Arc` so the in-DB SOF runner can share the same pooled client (it is
77    /// constructed from `&self` but outlives the borrow).
78    client: Arc<OnceCell<Client>>,
79    /// Search parameter registry (in-memory cache of active parameters).
80    search_registry: Arc<RwLock<SearchParameterRegistry>>,
81    /// Extractor for deriving searchable values from resources.
82    search_extractor: Arc<SearchParameterExtractor>,
83}
84
85impl Debug for MongoBackend {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("MongoBackend")
88            .field("config", &self.config)
89            .field("search_registry_len", &self.search_registry.read().len())
90            .finish_non_exhaustive()
91    }
92}
93
94/// Configuration for the MongoDB backend.
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct MongoBackendConfig {
97    /// MongoDB connection string.
98    #[serde(default = "default_connection_string")]
99    pub connection_string: String,
100
101    /// MongoDB database name used by this backend.
102    #[serde(default = "default_database_name")]
103    pub database_name: String,
104
105    /// Maximum number of connections in the driver pool.
106    #[serde(default = "default_max_connections")]
107    pub max_connections: u32,
108
109    /// Connection timeout in milliseconds.
110    #[serde(default = "default_connect_timeout_ms")]
111    pub connect_timeout_ms: u64,
112
113    /// FHIR version for this backend instance.
114    #[serde(default = "crate::default_fhir_version")]
115    pub fhir_version: FhirVersion,
116
117    /// Directory containing FHIR SearchParameter spec files.
118    #[serde(default)]
119    pub data_dir: Option<PathBuf>,
120
121    /// When true, search indexing is offloaded to a secondary backend.
122    #[serde(default)]
123    pub search_offloaded: bool,
124}
125
126fn default_connection_string() -> String {
127    "mongodb://localhost:27017".to_string()
128}
129
130fn default_database_name() -> String {
131    "helios".to_string()
132}
133
134fn default_max_connections() -> u32 {
135    10
136}
137
138fn default_connect_timeout_ms() -> u64 {
139    5000
140}
141
142impl Default for MongoBackendConfig {
143    fn default() -> Self {
144        Self {
145            connection_string: default_connection_string(),
146            database_name: default_database_name(),
147            max_connections: default_max_connections(),
148            connect_timeout_ms: default_connect_timeout_ms(),
149            fhir_version: FhirVersion::default_enabled(),
150            data_dir: None,
151            search_offloaded: false,
152        }
153    }
154}
155
156impl MongoBackend {
157    pub(crate) const RESOURCES_COLLECTION: &'static str = "resources";
158    pub(crate) const RESOURCE_HISTORY_COLLECTION: &'static str = "resource_history";
159    pub(crate) const SEARCH_INDEX_COLLECTION: &'static str = "search_index";
160
161    /// Creates a new MongoDB backend from the provided configuration.
162    pub fn new(config: MongoBackendConfig) -> StorageResult<Self> {
163        Self::validate_connection_string(&config.connection_string)?;
164
165        let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
166        Self::initialize_search_registry(&search_registry, &config);
167        let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
168
169        Ok(Self {
170            config,
171            client: Arc::new(OnceCell::new()),
172            search_registry,
173            search_extractor,
174        })
175    }
176
177    /// Creates a backend from a MongoDB connection string.
178    pub fn from_connection_string(connection_string: impl Into<String>) -> StorageResult<Self> {
179        let config = MongoBackendConfig {
180            connection_string: connection_string.into(),
181            ..Default::default()
182        };
183        Self::new(config)
184    }
185
186    /// Creates a backend from environment variables.
187    ///
188    /// Supported variables:
189    /// - `HFS_MONGODB_URL` (preferred)
190    /// - `HFS_MONGODB_URI` (alias)
191    /// - `HFS_DATABASE_URL` (fallback)
192    /// - `HFS_MONGODB_DATABASE` (default: `helios`)
193    /// - `HFS_MONGODB_MAX_CONNECTIONS` (default: `10`)
194    /// - `HFS_MONGODB_CONNECT_TIMEOUT_MS` (default: `5000`)
195    pub fn from_env() -> StorageResult<Self> {
196        let connection_string = std::env::var("HFS_MONGODB_URL")
197            .or_else(|_| std::env::var("HFS_MONGODB_URI"))
198            .or_else(|_| std::env::var("HFS_DATABASE_URL"))
199            .unwrap_or_else(|_| default_connection_string());
200
201        let database_name =
202            std::env::var("HFS_MONGODB_DATABASE").unwrap_or_else(|_| default_database_name());
203
204        let max_connections = std::env::var("HFS_MONGODB_MAX_CONNECTIONS")
205            .ok()
206            .and_then(|v| v.parse::<u32>().ok())
207            .unwrap_or_else(default_max_connections);
208
209        let connect_timeout_ms = std::env::var("HFS_MONGODB_CONNECT_TIMEOUT_MS")
210            .ok()
211            .and_then(|v| v.parse::<u64>().ok())
212            .unwrap_or_else(default_connect_timeout_ms);
213
214        let config = MongoBackendConfig {
215            connection_string,
216            database_name,
217            max_connections,
218            connect_timeout_ms,
219            ..Default::default()
220        };
221
222        Self::new(config)
223    }
224
225    fn validate_connection_string(connection_string: &str) -> StorageResult<()> {
226        let uri = connection_string.trim();
227        if uri.is_empty() {
228            return Err(StorageError::Backend(BackendError::ConnectionFailed {
229                backend_name: "mongodb".to_string(),
230                message: "MongoDB connection string cannot be empty".to_string(),
231            }));
232        }
233
234        if !Self::looks_like_mongodb_uri(uri) {
235            tracing::warn!(
236                uri = %uri,
237                "MongoDB connection string does not start with mongodb:// or mongodb+srv://"
238            );
239        }
240
241        Ok(())
242    }
243
244    fn looks_like_mongodb_uri(connection_string: &str) -> bool {
245        connection_string.starts_with("mongodb://")
246            || connection_string.starts_with("mongodb+srv://")
247    }
248
249    fn initialize_search_registry(
250        registry: &Arc<RwLock<SearchParameterRegistry>>,
251        config: &MongoBackendConfig,
252    ) {
253        let loader = SearchParameterLoader::new(config.fhir_version);
254        let mut reg = registry.write();
255
256        let mut fallback_count = 0;
257        let mut spec_count = 0;
258        let mut spec_file: Option<PathBuf> = None;
259        let mut custom_count = 0;
260        let mut custom_files: Vec<String> = Vec::new();
261
262        // 1. Load minimal embedded fallback params.
263        match loader.load_embedded() {
264            Ok(params) => {
265                for param in params {
266                    if reg.register(param).is_ok() {
267                        fallback_count += 1;
268                    }
269                }
270            }
271            Err(e) => {
272                tracing::error!("Failed to load embedded SearchParameters: {}", e);
273            }
274        }
275
276        // 2. Load spec file params.
277        let data_dir = config
278            .data_dir
279            .clone()
280            .unwrap_or_else(|| PathBuf::from("./data"));
281        let spec_filename = loader.spec_filename();
282        let spec_path = data_dir.join(spec_filename);
283        match loader.load_from_spec_file(&data_dir) {
284            Ok(params) => {
285                for param in params {
286                    if reg.register(param).is_ok() {
287                        spec_count += 1;
288                    }
289                }
290                if spec_count > 0 {
291                    spec_file = Some(spec_path);
292                }
293            }
294            Err(e) => {
295                tracing::warn!(
296                    "Could not load spec SearchParameters from {}: {}. Using minimal fallback.",
297                    spec_path.display(),
298                    e
299                );
300            }
301        }
302
303        // 3. Load custom SearchParameters.
304        match loader.load_custom_from_directory_with_files(&data_dir) {
305            Ok((params, files)) => {
306                for param in params {
307                    if reg.register(param).is_ok() {
308                        custom_count += 1;
309                    }
310                }
311                custom_files = files;
312            }
313            Err(e) => {
314                tracing::warn!(
315                    "Error loading custom SearchParameters from {}: {}",
316                    data_dir.display(),
317                    e
318                );
319            }
320        }
321
322        let resource_type_count = reg.resource_types().len();
323        let spec_info = spec_file
324            .map(|p| format!(" from {}", p.display()))
325            .unwrap_or_default();
326        let custom_info = if custom_files.is_empty() {
327            String::new()
328        } else {
329            format!(" [{}]", custom_files.join(", "))
330        };
331
332        tracing::info!(
333            "MongoDB SearchParameter registry initialized: {} total ({} spec{}, {} fallback, {} custom{}) covering {} resource types",
334            reg.len(),
335            spec_count,
336            spec_info,
337            fallback_count,
338            custom_count,
339            custom_info,
340            resource_type_count
341        );
342    }
343
344    /// Initializes the MongoDB schema/index bootstrap for this backend.
345    pub async fn init_schema(&self) -> StorageResult<()> {
346        let db = self.get_database().await?;
347        schema::initialize_schema_async(&db).await
348    }
349
350    /// Creates a MongoDB client from backend configuration.
351    /// Returns the shared MongoDB client for this backend.
352    pub(crate) async fn get_client(&self) -> StorageResult<Client> {
353        self.client
354            .get_or_try_init(|| connect_client(&self.config))
355            .await
356            .cloned()
357    }
358
359    /// Returns a handle to the shared client cell, so collaborators constructed
360    /// from `&self` (e.g. the in-DB SOF runner) can lazily initialise and reuse
361    /// the same pooled client.
362    pub(crate) fn client_cell(&self) -> Arc<OnceCell<Client>> {
363        Arc::clone(&self.client)
364    }
365
366    /// Returns the configured MongoDB database handle.
367    pub(crate) async fn get_database(&self) -> StorageResult<Database> {
368        let client = self.get_client().await?;
369        Ok(client.database(&self.config.database_name))
370    }
371
372    /// Returns the backend configuration.
373    pub fn config(&self) -> &MongoBackendConfig {
374        &self.config
375    }
376
377    /// Returns a reference to the search parameter registry.
378    pub fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
379        &self.search_registry
380    }
381
382    /// Returns a reference to the search parameter extractor.
383    pub fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
384        &self.search_extractor
385    }
386
387    /// Returns whether search indexing is offloaded to a secondary backend.
388    pub fn is_search_offloaded(&self) -> bool {
389        self.config.search_offloaded
390    }
391
392    /// Sets the search-offloaded flag.
393    pub fn set_search_offloaded(&mut self, offloaded: bool) {
394        self.config.search_offloaded = offloaded;
395    }
396}
397
398/// Connection wrapper for MongoDB.
399#[derive(Clone)]
400pub struct MongoConnection {
401    pub(crate) database: Database,
402}
403
404impl Debug for MongoConnection {
405    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406        f.debug_struct("MongoConnection")
407            .field("database", &self.database.name())
408            .finish_non_exhaustive()
409    }
410}
411
412#[async_trait]
413impl Backend for MongoBackend {
414    type Connection = MongoConnection;
415
416    fn kind(&self) -> BackendKind {
417        BackendKind::MongoDB
418    }
419
420    fn name(&self) -> &'static str {
421        "mongodb"
422    }
423
424    fn supports(&self, capability: BackendCapability) -> bool {
425        matches!(
426            capability,
427            BackendCapability::Crud
428                | BackendCapability::Versioning
429                | BackendCapability::InstanceHistory
430                | BackendCapability::TypeHistory
431                | BackendCapability::SystemHistory
432                | BackendCapability::BasicSearch
433                | BackendCapability::DateSearch
434                | BackendCapability::ReferenceSearch
435                | BackendCapability::Sorting
436                | BackendCapability::OffsetPagination
437                | BackendCapability::CursorPagination
438                | BackendCapability::Transactions
439                | BackendCapability::OptimisticLocking
440                | BackendCapability::SharedSchema
441        )
442    }
443
444    fn capabilities(&self) -> Vec<BackendCapability> {
445        vec![
446            BackendCapability::Crud,
447            BackendCapability::Versioning,
448            BackendCapability::InstanceHistory,
449            BackendCapability::TypeHistory,
450            BackendCapability::SystemHistory,
451            BackendCapability::BasicSearch,
452            BackendCapability::DateSearch,
453            BackendCapability::ReferenceSearch,
454            BackendCapability::Sorting,
455            BackendCapability::OffsetPagination,
456            BackendCapability::CursorPagination,
457            BackendCapability::Transactions,
458            BackendCapability::OptimisticLocking,
459            BackendCapability::SharedSchema,
460        ]
461    }
462
463    async fn acquire(&self) -> Result<Self::Connection, BackendError> {
464        let client = self
465            .get_client()
466            .await
467            .map_err(|e| BackendError::ConnectionFailed {
468                backend_name: "mongodb".to_string(),
469                message: e.to_string(),
470            })?;
471        let database = client.database(&self.config.database_name);
472        Ok(MongoConnection { database })
473    }
474
475    async fn release(&self, _conn: Self::Connection) {
476        // MongoDB connection pooling is managed by the client internally.
477    }
478
479    async fn health_check(&self) -> Result<(), BackendError> {
480        if !Self::looks_like_mongodb_uri(&self.config.connection_string) {
481            return Err(BackendError::Unavailable {
482                backend_name: "mongodb".to_string(),
483                message: "Invalid MongoDB connection string format".to_string(),
484            });
485        }
486
487        let db = self
488            .get_database()
489            .await
490            .map_err(|e| BackendError::Unavailable {
491                backend_name: "mongodb".to_string(),
492                message: format!("Unable to create database handle: {}", e),
493            })?;
494
495        db.run_command(doc! { "ping": 1_i32 })
496            .await
497            .map_err(|e| BackendError::Unavailable {
498                backend_name: "mongodb".to_string(),
499                message: format!("Health check failed: {}", e),
500            })?;
501
502        Ok(())
503    }
504
505    async fn initialize(&self) -> Result<(), BackendError> {
506        self.init_schema()
507            .await
508            .map_err(|e| BackendError::Internal {
509                backend_name: "mongodb".to_string(),
510                message: format!("Failed to initialize schema: {}", e),
511                source: None,
512            })
513    }
514
515    async fn migrate(&self) -> Result<(), BackendError> {
516        let db = self
517            .get_database()
518            .await
519            .map_err(|e| BackendError::Internal {
520                backend_name: "mongodb".to_string(),
521                message: format!("Failed to acquire database for migration: {}", e),
522                source: None,
523            })?;
524
525        schema::migrate_schema_async(&db)
526            .await
527            .map_err(|e| BackendError::Internal {
528                backend_name: "mongodb".to_string(),
529                message: format!("Failed to run migrations: {}", e),
530                source: None,
531            })
532    }
533}
534
535// ============================================================================
536// SearchCapabilityProvider Implementation
537// ============================================================================
538
539use crate::core::capabilities::{
540    GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
541};
542use crate::types::{
543    IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
544    SearchParamType, SpecialSearchParam,
545};
546
547impl MongoBackend {
548    /// Returns the search modifiers the MongoDB backend actually honors for a
549    /// parameter type. Unlike the SQL backends, the Mongo search implementation
550    /// does not implement `:missing`, token `:not`/`:of-type`, reference
551    /// `:identifier`, or uri `:above`/`:below`; advertising only what
552    /// `search_impl` accepts keeps the CapabilityStatement honest.
553    pub(super) fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
554        match param_type {
555            SearchParamType::String => vec!["exact", "contains", "text"],
556            SearchParamType::Token => vec!["text", "code-text"],
557            SearchParamType::Reference => vec!["contains", "text", "code-text"],
558            SearchParamType::Uri => vec!["exact", "contains"],
559            SearchParamType::Date
560            | SearchParamType::Number
561            | SearchParamType::Quantity
562            | SearchParamType::Composite
563            | SearchParamType::Special => vec![],
564        }
565    }
566}
567
568impl SearchCapabilityProvider for MongoBackend {
569    fn resource_search_capabilities(
570        &self,
571        resource_type: &str,
572    ) -> Option<ResourceSearchCapabilities> {
573        let params = {
574            let registry = self.search_registry.read();
575            registry.get_active_params(resource_type)
576        };
577        let common_params = {
578            let registry = self.search_registry.read();
579            registry.get_active_params("Resource")
580        };
581        if params.is_empty() && common_params.is_empty() {
582            return None;
583        }
584
585        let mut search_params = Vec::new();
586        for param in &params {
587            let mut cap = SearchParamFullCapability::new(&param.code, param.param_type)
588                .with_definition(&param.url)
589                .with_modifiers(Self::modifiers_for_type(param.param_type));
590            if let Some(ref targets) = param.target {
591                cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
592            }
593            search_params.push(cap);
594        }
595        for param in &common_params {
596            if !search_params.iter().any(|p| p.name == param.code) {
597                search_params.push(
598                    SearchParamFullCapability::new(&param.code, param.param_type)
599                        .with_definition(&param.url)
600                        .with_modifiers(Self::modifiers_for_type(param.param_type)),
601                );
602            }
603        }
604
605        Some(
606            ResourceSearchCapabilities::new(resource_type)
607                .with_special_params(vec![SpecialSearchParam::Id])
608                .with_include_capabilities(vec![
609                    IncludeCapability::Include,
610                    IncludeCapability::Revinclude,
611                ])
612                .with_pagination_capabilities(vec![
613                    PaginationCapability::Count,
614                    PaginationCapability::Offset,
615                    PaginationCapability::MaxPageSize(1000),
616                    PaginationCapability::DefaultPageSize(20),
617                ])
618                .with_result_mode_capabilities(vec![ResultModeCapability::Total])
619                .with_param_list(search_params),
620        )
621    }
622
623    fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
624        GlobalSearchCapabilities::new().with_special_params(vec![SpecialSearchParam::Id])
625    }
626}
627
628#[cfg(test)]
629mod capability_tests {
630    use super::*;
631
632    #[test]
633    fn test_modifiers_for_type_reflects_mongo_support() {
634        // String honors exact/contains/text but not :missing.
635        let s = MongoBackend::modifiers_for_type(SearchParamType::String);
636        assert!(s.contains(&"exact"));
637        assert!(s.contains(&"text"));
638        assert!(!s.contains(&"missing"));
639
640        // Token honors text/code-text but not the non-spec :code, nor :not / :of-type.
641        let t = MongoBackend::modifiers_for_type(SearchParamType::Token);
642        assert!(!t.contains(&"code"));
643        assert!(t.contains(&"code-text"));
644        assert!(!t.contains(&"not"));
645        assert!(!t.contains(&"of-type"));
646
647        // Reference honors contains/text/code-text but not :identifier.
648        let r = MongoBackend::modifiers_for_type(SearchParamType::Reference);
649        assert!(r.contains(&"contains"));
650        assert!(r.contains(&"text"));
651        assert!(!r.contains(&"identifier"));
652
653        // Uri honors exact/contains but not :above/:below.
654        let u = MongoBackend::modifiers_for_type(SearchParamType::Uri);
655        assert!(u.contains(&"contains"));
656        assert!(!u.contains(&"above"));
657        assert!(!u.contains(&"below"));
658    }
659}