Skip to main content

helios_persistence/backends/elasticsearch/
backend.rs

1//! Elasticsearch backend implementation.
2
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use elasticsearch::Elasticsearch;
9use elasticsearch::auth::Credentials;
10use elasticsearch::cert::CertificateValidation;
11use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15
16use helios_fhir::FhirVersion;
17
18use crate::core::{Backend, BackendCapability, BackendKind};
19use crate::error::{BackendError, StorageResult};
20use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
21
22/// Authentication configuration for Elasticsearch.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum ElasticsearchAuth {
25    /// Basic username/password authentication.
26    Basic {
27        /// The username for basic auth.
28        username: String,
29        /// The password for basic auth.
30        password: String,
31    },
32    /// Bearer token authentication.
33    Bearer {
34        /// The bearer token.
35        token: String,
36    },
37}
38
39/// Configuration for the Elasticsearch backend.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ElasticsearchConfig {
42    /// Elasticsearch node URLs (e.g., `["http://localhost:9200"]`).
43    /// Currently uses the first node (single-node connection pool).
44    pub nodes: Vec<String>,
45
46    /// Index name prefix (default: `"hfs"`).
47    /// Indices are named: `{prefix}_{tenant_id}_{resource_type_lowercase}`
48    #[serde(default = "default_index_prefix")]
49    pub index_prefix: String,
50
51    /// Number of primary shards per index (default: 1).
52    #[serde(default = "default_shards")]
53    pub number_of_shards: u32,
54
55    /// Number of replica shards per index (default: 1).
56    #[serde(default = "default_replicas")]
57    pub number_of_replicas: u32,
58
59    /// Refresh interval (default: "1s").
60    #[serde(default = "default_refresh_interval")]
61    pub refresh_interval: String,
62
63    /// Maximum result window size (default: 10000).
64    #[serde(default = "default_max_result_window")]
65    pub max_result_window: u32,
66
67    /// Request timeout in milliseconds (default: 30000).
68    #[serde(default = "default_request_timeout_ms")]
69    pub request_timeout_ms: u64,
70
71    /// Optional authentication.
72    #[serde(default)]
73    pub auth: Option<ElasticsearchAuth>,
74
75    /// Whether to disable certificate validation (default: false).
76    /// Only use for development/testing.
77    #[serde(default)]
78    pub disable_certificate_validation: bool,
79
80    /// FHIR version for SearchParameter loading.
81    #[serde(default)]
82    pub fhir_version: FhirVersion,
83}
84
85fn default_index_prefix() -> String {
86    "hfs".to_string()
87}
88
89fn default_shards() -> u32 {
90    1
91}
92
93fn default_replicas() -> u32 {
94    1
95}
96
97fn default_refresh_interval() -> String {
98    "1s".to_string()
99}
100
101fn default_max_result_window() -> u32 {
102    10000
103}
104
105fn default_request_timeout_ms() -> u64 {
106    30000
107}
108
109impl Default for ElasticsearchConfig {
110    fn default() -> Self {
111        Self {
112            nodes: vec!["http://localhost:9200".to_string()],
113            index_prefix: default_index_prefix(),
114            number_of_shards: default_shards(),
115            number_of_replicas: default_replicas(),
116            refresh_interval: default_refresh_interval(),
117            max_result_window: default_max_result_window(),
118            request_timeout_ms: default_request_timeout_ms(),
119            auth: None,
120            disable_certificate_validation: false,
121            fhir_version: FhirVersion::default(),
122        }
123    }
124}
125
126/// Elasticsearch backend for FHIR resource search.
127///
128/// This backend is designed as a search-optimized secondary in the composite
129/// storage layer. It receives data via sync events from the primary backend
130/// and provides efficient search capabilities.
131pub struct ElasticsearchBackend {
132    /// The Elasticsearch client.
133    client: Elasticsearch,
134    /// Configuration.
135    config: ElasticsearchConfig,
136    /// Search parameter registry (shared with primary for consistency).
137    search_registry: Arc<RwLock<SearchParameterRegistry>>,
138    /// Search parameter extractor.
139    search_extractor: Arc<SearchParameterExtractor>,
140}
141
142impl Debug for ElasticsearchBackend {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("ElasticsearchBackend")
145            .field("config", &self.config)
146            .field("search_registry_len", &self.search_registry.read().len())
147            .finish_non_exhaustive()
148    }
149}
150
151impl ElasticsearchBackend {
152    /// Creates a new Elasticsearch backend with the given configuration.
153    pub fn new(config: ElasticsearchConfig) -> StorageResult<Self> {
154        let client = Self::build_client(&config)?;
155
156        // Initialize search parameter registry
157        let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
158        {
159            let loader = SearchParameterLoader::new(config.fhir_version);
160            let mut registry = search_registry.write();
161
162            // Load embedded fallback params
163            match loader.load_embedded() {
164                Ok(params) => {
165                    for param in params {
166                        let _ = registry.register(param);
167                    }
168                }
169                Err(e) => {
170                    tracing::error!("Failed to load embedded SearchParameters: {}", e);
171                }
172            }
173
174            tracing::info!(
175                "Elasticsearch SearchParameter registry initialized: {} params covering {} resource types",
176                registry.len(),
177                registry.resource_types().len()
178            );
179        }
180        let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
181
182        Ok(Self {
183            client,
184            config,
185            search_registry,
186            search_extractor,
187        })
188    }
189
190    /// Creates a new backend with a shared search parameter registry.
191    ///
192    /// Use this when the ES backend should share its registry with a primary backend.
193    pub fn with_shared_registry(
194        config: ElasticsearchConfig,
195        search_registry: Arc<RwLock<SearchParameterRegistry>>,
196    ) -> StorageResult<Self> {
197        let client = Self::build_client(&config)?;
198        let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
199
200        Ok(Self {
201            client,
202            config,
203            search_registry,
204            search_extractor,
205        })
206    }
207
208    /// Builds the Elasticsearch client from configuration.
209    fn build_client(config: &ElasticsearchConfig) -> StorageResult<Elasticsearch> {
210        let url = config
211            .nodes
212            .first()
213            .cloned()
214            .unwrap_or_else(|| "http://localhost:9200".to_string());
215
216        let parsed_url: elasticsearch::http::Url = url.parse().map_err(|e| {
217            crate::error::StorageError::Backend(BackendError::ConnectionFailed {
218                backend_name: "elasticsearch".to_string(),
219                message: format!("Invalid URL: {}", e),
220            })
221        })?;
222
223        let conn_pool = SingleNodeConnectionPool::new(parsed_url);
224
225        let mut builder = TransportBuilder::new(conn_pool)
226            .timeout(Duration::from_millis(config.request_timeout_ms));
227
228        if config.disable_certificate_validation {
229            builder = builder.cert_validation(CertificateValidation::None);
230        }
231
232        if let Some(ref auth) = config.auth {
233            builder = match auth {
234                ElasticsearchAuth::Basic { username, password } => {
235                    builder.auth(Credentials::Basic(username.clone(), password.clone()))
236                }
237                ElasticsearchAuth::Bearer { token } => {
238                    builder.auth(Credentials::Bearer(token.clone()))
239                }
240            };
241        }
242
243        let transport = builder.build().map_err(|e| {
244            crate::error::StorageError::Backend(BackendError::ConnectionFailed {
245                backend_name: "elasticsearch".to_string(),
246                message: format!("Failed to build transport: {}", e),
247            })
248        })?;
249
250        Ok(Elasticsearch::new(transport))
251    }
252
253    /// Returns the Elasticsearch client.
254    pub(crate) fn client(&self) -> &Elasticsearch {
255        &self.client
256    }
257
258    /// Returns the backend configuration.
259    pub fn config(&self) -> &ElasticsearchConfig {
260        &self.config
261    }
262
263    /// Returns the search parameter registry.
264    #[allow(dead_code)]
265    pub(crate) fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
266        &self.search_registry
267    }
268
269    /// Returns the search parameter extractor.
270    pub(crate) fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
271        &self.search_extractor
272    }
273
274    /// Returns the index name for a tenant and resource type.
275    pub fn index_name(&self, tenant_id: &str, resource_type: &str) -> String {
276        format!(
277            "{}_{}_{}",
278            self.config.index_prefix,
279            tenant_id.to_lowercase(),
280            resource_type.to_lowercase()
281        )
282    }
283
284    /// Returns the ES document ID for a resource.
285    pub(crate) fn document_id(resource_type: &str, resource_id: &str) -> String {
286        format!("{}_{}", resource_type, resource_id)
287    }
288
289    /// Refreshes an index to make recently indexed documents searchable.
290    ///
291    /// Only needed for testing; in production ES refreshes automatically.
292    pub async fn refresh_index(&self, tenant_id: &str, resource_type: &str) -> StorageResult<()> {
293        let index = self.index_name(tenant_id, resource_type);
294        self.client
295            .indices()
296            .refresh(elasticsearch::indices::IndicesRefreshParts::Index(&[
297                &index,
298            ]))
299            .send()
300            .await
301            .map_err(|e| {
302                crate::error::StorageError::Backend(BackendError::Internal {
303                    backend_name: "elasticsearch".to_string(),
304                    message: format!("Failed to refresh index {}: {}", index, e),
305                    source: None,
306                })
307            })?;
308        Ok(())
309    }
310}
311
312/// Connection wrapper for Elasticsearch.
313///
314/// ES uses an HTTP client internally, so connections are managed by the transport.
315/// This is a placeholder to satisfy the `Backend` trait's `Connection` associated type.
316#[derive(Debug)]
317pub struct ElasticsearchConnection;
318
319#[async_trait]
320impl Backend for ElasticsearchBackend {
321    type Connection = ElasticsearchConnection;
322
323    fn kind(&self) -> BackendKind {
324        BackendKind::Elasticsearch
325    }
326
327    fn name(&self) -> &'static str {
328        "elasticsearch"
329    }
330
331    fn supports(&self, capability: BackendCapability) -> bool {
332        matches!(
333            capability,
334            BackendCapability::Crud
335                | BackendCapability::BasicSearch
336                | BackendCapability::DateSearch
337                | BackendCapability::QuantitySearch
338                | BackendCapability::ReferenceSearch
339                | BackendCapability::FullTextSearch
340                | BackendCapability::Sorting
341                | BackendCapability::CursorPagination
342                | BackendCapability::OffsetPagination
343                | BackendCapability::Include
344                | BackendCapability::Revinclude
345                | BackendCapability::SharedSchema
346        )
347    }
348
349    fn capabilities(&self) -> Vec<BackendCapability> {
350        vec![
351            BackendCapability::Crud,
352            BackendCapability::BasicSearch,
353            BackendCapability::DateSearch,
354            BackendCapability::QuantitySearch,
355            BackendCapability::ReferenceSearch,
356            BackendCapability::FullTextSearch,
357            BackendCapability::Sorting,
358            BackendCapability::CursorPagination,
359            BackendCapability::OffsetPagination,
360            BackendCapability::Include,
361            BackendCapability::Revinclude,
362            BackendCapability::SharedSchema,
363        ]
364    }
365
366    async fn acquire(&self) -> Result<Self::Connection, BackendError> {
367        // ES client manages connections internally via HTTP transport
368        Ok(ElasticsearchConnection)
369    }
370
371    async fn release(&self, _conn: Self::Connection) {
372        // No-op: ES client manages connections internally
373    }
374
375    async fn health_check(&self) -> Result<(), BackendError> {
376        let response = self
377            .client
378            .cluster()
379            .health(elasticsearch::cluster::ClusterHealthParts::None)
380            .send()
381            .await
382            .map_err(|e| BackendError::Unavailable {
383                backend_name: "elasticsearch".to_string(),
384                message: format!("Health check failed: {}", e),
385            })?;
386
387        let status = response.status_code();
388        if !status.is_success() {
389            return Err(BackendError::Unavailable {
390                backend_name: "elasticsearch".to_string(),
391                message: format!("Cluster health returned status {}", status),
392            });
393        }
394
395        let body = response
396            .json::<Value>()
397            .await
398            .map_err(|e| BackendError::Internal {
399                backend_name: "elasticsearch".to_string(),
400                message: format!("Failed to parse health response: {}", e),
401                source: None,
402            })?;
403
404        let cluster_status = body
405            .get("status")
406            .and_then(|s| s.as_str())
407            .unwrap_or("unknown");
408
409        if cluster_status == "red" {
410            return Err(BackendError::Unavailable {
411                backend_name: "elasticsearch".to_string(),
412                message: format!("Cluster status is red: {:?}", body),
413            });
414        }
415
416        Ok(())
417    }
418
419    async fn initialize(&self) -> Result<(), BackendError> {
420        // Create index template for automatic index creation
421        super::schema::create_index_template(self)
422            .await
423            .map_err(|e| BackendError::Internal {
424                backend_name: "elasticsearch".to_string(),
425                message: format!("Failed to create index template: {}", e),
426                source: None,
427            })
428    }
429
430    async fn migrate(&self) -> Result<(), BackendError> {
431        // Re-apply index template (idempotent)
432        self.initialize().await
433    }
434}
435
436// ============================================================================
437// SearchCapabilityProvider Implementation
438// ============================================================================
439
440use crate::core::capabilities::{
441    GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
442};
443use crate::types::{
444    IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
445    SearchParamType, SpecialSearchParam,
446};
447
448impl SearchCapabilityProvider for ElasticsearchBackend {
449    fn resource_search_capabilities(
450        &self,
451        resource_type: &str,
452    ) -> Option<ResourceSearchCapabilities> {
453        let params = {
454            let registry = self.search_registry.read();
455            registry.get_active_params(resource_type)
456        };
457
458        if params.is_empty() {
459            let common_params = {
460                let registry = self.search_registry.read();
461                registry.get_active_params("Resource")
462            };
463            if common_params.is_empty() {
464                return None;
465            }
466        }
467
468        let mut search_params = Vec::new();
469        for param in &params {
470            let mut cap = SearchParamFullCapability::new(&param.code, param.param_type)
471                .with_definition(&param.url);
472            let modifiers = Self::modifiers_for_type(param.param_type);
473            cap = cap.with_modifiers(modifiers);
474            if let Some(ref targets) = param.target {
475                cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
476            }
477            search_params.push(cap);
478        }
479
480        // Add common Resource-level parameters
481        let common_params = {
482            let registry = self.search_registry.read();
483            registry.get_active_params("Resource")
484        };
485        for param in &common_params {
486            if !search_params.iter().any(|p| p.name == param.code) {
487                let mut cap = SearchParamFullCapability::new(&param.code, param.param_type)
488                    .with_definition(&param.url);
489                cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
490                search_params.push(cap);
491            }
492        }
493
494        Some(
495            ResourceSearchCapabilities::new(resource_type)
496                .with_special_params(vec![
497                    SpecialSearchParam::Id,
498                    SpecialSearchParam::LastUpdated,
499                    SpecialSearchParam::Tag,
500                    SpecialSearchParam::Profile,
501                    SpecialSearchParam::Security,
502                    SpecialSearchParam::Text,
503                    SpecialSearchParam::Content,
504                ])
505                .with_include_capabilities(vec![
506                    IncludeCapability::Include,
507                    IncludeCapability::Revinclude,
508                ])
509                .with_pagination_capabilities(vec![
510                    PaginationCapability::Count,
511                    PaginationCapability::Offset,
512                    PaginationCapability::Cursor,
513                    PaginationCapability::MaxPageSize(1000),
514                    PaginationCapability::DefaultPageSize(20),
515                ])
516                .with_result_mode_capabilities(vec![
517                    ResultModeCapability::Total,
518                    ResultModeCapability::TotalNone,
519                    ResultModeCapability::TotalAccurate,
520                    ResultModeCapability::SummaryCount,
521                ])
522                .with_param_list(search_params),
523        )
524    }
525
526    fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
527        GlobalSearchCapabilities::new()
528            .with_special_params(vec![
529                SpecialSearchParam::Id,
530                SpecialSearchParam::LastUpdated,
531                SpecialSearchParam::Tag,
532                SpecialSearchParam::Profile,
533                SpecialSearchParam::Security,
534                SpecialSearchParam::Text,
535                SpecialSearchParam::Content,
536            ])
537            .with_pagination(vec![
538                PaginationCapability::Count,
539                PaginationCapability::Offset,
540                PaginationCapability::Cursor,
541                PaginationCapability::MaxPageSize(1000),
542                PaginationCapability::DefaultPageSize(20),
543            ])
544            .with_system_search()
545    }
546}
547
548impl ElasticsearchBackend {
549    /// Returns supported modifiers for a parameter type.
550    ///
551    /// ES supports more modifiers than SQLite, especially for full-text.
552    fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
553        match param_type {
554            SearchParamType::String => vec!["exact", "contains", "text", "missing"],
555            SearchParamType::Token => {
556                vec![
557                    "not",
558                    "text",
559                    "text-advanced",
560                    "in",
561                    "not-in",
562                    "of-type",
563                    "missing",
564                ]
565            }
566            SearchParamType::Reference => vec!["identifier", "missing"],
567            SearchParamType::Date => vec!["missing"],
568            SearchParamType::Number => vec!["missing"],
569            SearchParamType::Quantity => vec!["missing"],
570            SearchParamType::Uri => vec!["below", "above", "missing"],
571            SearchParamType::Composite => vec!["missing"],
572            SearchParamType::Special => vec![],
573        }
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580
581    #[test]
582    fn test_config_defaults() {
583        let config = ElasticsearchConfig::default();
584        assert_eq!(config.index_prefix, "hfs");
585        assert_eq!(config.number_of_shards, 1);
586        assert_eq!(config.number_of_replicas, 1);
587        assert_eq!(config.nodes, vec!["http://localhost:9200"]);
588    }
589
590    #[test]
591    fn test_index_name() {
592        let config = ElasticsearchConfig::default();
593        let backend = ElasticsearchBackend::new(config).unwrap();
594        assert_eq!(backend.index_name("acme", "Patient"), "hfs_acme_patient");
595        assert_eq!(
596            backend.index_name("ACME", "Observation"),
597            "hfs_acme_observation"
598        );
599    }
600
601    #[test]
602    fn test_document_id() {
603        assert_eq!(
604            ElasticsearchBackend::document_id("Patient", "123"),
605            "Patient_123"
606        );
607    }
608
609    #[test]
610    fn test_backend_capabilities() {
611        let config = ElasticsearchConfig::default();
612        let backend = ElasticsearchBackend::new(config).unwrap();
613
614        assert!(backend.supports(BackendCapability::BasicSearch));
615        assert!(backend.supports(BackendCapability::FullTextSearch));
616        assert!(backend.supports(BackendCapability::CursorPagination));
617        assert!(backend.supports(BackendCapability::Sorting));
618        assert!(!backend.supports(BackendCapability::Transactions));
619        assert!(!backend.supports(BackendCapability::ChainedSearch));
620    }
621
622    #[test]
623    fn test_backend_kind() {
624        let config = ElasticsearchConfig::default();
625        let backend = ElasticsearchBackend::new(config).unwrap();
626        assert_eq!(backend.kind(), BackendKind::Elasticsearch);
627        assert_eq!(backend.name(), "elasticsearch");
628    }
629}