1use std::fmt::Debug;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use elasticsearch::Elasticsearch;
9use elasticsearch::auth::Credentials;
10use elasticsearch::cert::CertificateValidation;
11use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15
16use helios_fhir::FhirVersion;
17
18use crate::core::{Backend, BackendCapability, BackendKind};
19use crate::error::{BackendError, StorageResult};
20use crate::search::{SearchParameterExtractor, SearchParameterLoader, SearchParameterRegistry};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum ElasticsearchAuth {
25 Basic {
27 username: String,
29 password: String,
31 },
32 Bearer {
34 token: String,
36 },
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ElasticsearchConfig {
42 pub nodes: Vec<String>,
45
46 #[serde(default = "default_index_prefix")]
49 pub index_prefix: String,
50
51 #[serde(default = "default_shards")]
53 pub number_of_shards: u32,
54
55 #[serde(default = "default_replicas")]
57 pub number_of_replicas: u32,
58
59 #[serde(default = "default_refresh_interval")]
61 pub refresh_interval: String,
62
63 #[serde(default = "default_max_result_window")]
65 pub max_result_window: u32,
66
67 #[serde(default = "default_request_timeout_ms")]
69 pub request_timeout_ms: u64,
70
71 #[serde(default)]
73 pub auth: Option<ElasticsearchAuth>,
74
75 #[serde(default)]
78 pub disable_certificate_validation: bool,
79
80 #[serde(default)]
82 pub fhir_version: FhirVersion,
83}
84
85fn default_index_prefix() -> String {
86 "hfs".to_string()
87}
88
89fn default_shards() -> u32 {
90 1
91}
92
93fn default_replicas() -> u32 {
94 1
95}
96
97fn default_refresh_interval() -> String {
98 "1s".to_string()
99}
100
101fn default_max_result_window() -> u32 {
102 10000
103}
104
105fn default_request_timeout_ms() -> u64 {
106 30000
107}
108
109impl Default for ElasticsearchConfig {
110 fn default() -> Self {
111 Self {
112 nodes: vec!["http://localhost:9200".to_string()],
113 index_prefix: default_index_prefix(),
114 number_of_shards: default_shards(),
115 number_of_replicas: default_replicas(),
116 refresh_interval: default_refresh_interval(),
117 max_result_window: default_max_result_window(),
118 request_timeout_ms: default_request_timeout_ms(),
119 auth: None,
120 disable_certificate_validation: false,
121 fhir_version: FhirVersion::default(),
122 }
123 }
124}
125
126pub struct ElasticsearchBackend {
132 client: Elasticsearch,
134 config: ElasticsearchConfig,
136 search_registry: Arc<RwLock<SearchParameterRegistry>>,
138 search_extractor: Arc<SearchParameterExtractor>,
140}
141
142impl Debug for ElasticsearchBackend {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("ElasticsearchBackend")
145 .field("config", &self.config)
146 .field("search_registry_len", &self.search_registry.read().len())
147 .finish_non_exhaustive()
148 }
149}
150
151impl ElasticsearchBackend {
152 pub fn new(config: ElasticsearchConfig) -> StorageResult<Self> {
154 let client = Self::build_client(&config)?;
155
156 let search_registry = Arc::new(RwLock::new(SearchParameterRegistry::new()));
158 {
159 let loader = SearchParameterLoader::new(config.fhir_version);
160 let mut registry = search_registry.write();
161
162 match loader.load_embedded() {
164 Ok(params) => {
165 for param in params {
166 let _ = registry.register(param);
167 }
168 }
169 Err(e) => {
170 tracing::error!("Failed to load embedded SearchParameters: {}", e);
171 }
172 }
173
174 tracing::info!(
175 "Elasticsearch SearchParameter registry initialized: {} params covering {} resource types",
176 registry.len(),
177 registry.resource_types().len()
178 );
179 }
180 let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
181
182 Ok(Self {
183 client,
184 config,
185 search_registry,
186 search_extractor,
187 })
188 }
189
190 pub fn with_shared_registry(
194 config: ElasticsearchConfig,
195 search_registry: Arc<RwLock<SearchParameterRegistry>>,
196 ) -> StorageResult<Self> {
197 let client = Self::build_client(&config)?;
198 let search_extractor = Arc::new(SearchParameterExtractor::new(search_registry.clone()));
199
200 Ok(Self {
201 client,
202 config,
203 search_registry,
204 search_extractor,
205 })
206 }
207
208 fn build_client(config: &ElasticsearchConfig) -> StorageResult<Elasticsearch> {
210 let url = config
211 .nodes
212 .first()
213 .cloned()
214 .unwrap_or_else(|| "http://localhost:9200".to_string());
215
216 let parsed_url: elasticsearch::http::Url = url.parse().map_err(|e| {
217 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
218 backend_name: "elasticsearch".to_string(),
219 message: format!("Invalid URL: {}", e),
220 })
221 })?;
222
223 let conn_pool = SingleNodeConnectionPool::new(parsed_url);
224
225 let mut builder = TransportBuilder::new(conn_pool)
226 .timeout(Duration::from_millis(config.request_timeout_ms));
227
228 if config.disable_certificate_validation {
229 builder = builder.cert_validation(CertificateValidation::None);
230 }
231
232 if let Some(ref auth) = config.auth {
233 builder = match auth {
234 ElasticsearchAuth::Basic { username, password } => {
235 builder.auth(Credentials::Basic(username.clone(), password.clone()))
236 }
237 ElasticsearchAuth::Bearer { token } => {
238 builder.auth(Credentials::Bearer(token.clone()))
239 }
240 };
241 }
242
243 let transport = builder.build().map_err(|e| {
244 crate::error::StorageError::Backend(BackendError::ConnectionFailed {
245 backend_name: "elasticsearch".to_string(),
246 message: format!("Failed to build transport: {}", e),
247 })
248 })?;
249
250 Ok(Elasticsearch::new(transport))
251 }
252
253 pub(crate) fn client(&self) -> &Elasticsearch {
255 &self.client
256 }
257
258 pub fn config(&self) -> &ElasticsearchConfig {
260 &self.config
261 }
262
263 #[allow(dead_code)]
265 pub(crate) fn search_registry(&self) -> &Arc<RwLock<SearchParameterRegistry>> {
266 &self.search_registry
267 }
268
269 pub(crate) fn search_extractor(&self) -> &Arc<SearchParameterExtractor> {
271 &self.search_extractor
272 }
273
274 pub fn index_name(&self, tenant_id: &str, resource_type: &str) -> String {
276 format!(
277 "{}_{}_{}",
278 self.config.index_prefix,
279 tenant_id.to_lowercase(),
280 resource_type.to_lowercase()
281 )
282 }
283
284 pub(crate) fn document_id(resource_type: &str, resource_id: &str) -> String {
286 format!("{}_{}", resource_type, resource_id)
287 }
288
289 pub async fn refresh_index(&self, tenant_id: &str, resource_type: &str) -> StorageResult<()> {
293 let index = self.index_name(tenant_id, resource_type);
294 self.client
295 .indices()
296 .refresh(elasticsearch::indices::IndicesRefreshParts::Index(&[
297 &index,
298 ]))
299 .send()
300 .await
301 .map_err(|e| {
302 crate::error::StorageError::Backend(BackendError::Internal {
303 backend_name: "elasticsearch".to_string(),
304 message: format!("Failed to refresh index {}: {}", index, e),
305 source: None,
306 })
307 })?;
308 Ok(())
309 }
310}
311
312#[derive(Debug)]
317pub struct ElasticsearchConnection;
318
319#[async_trait]
320impl Backend for ElasticsearchBackend {
321 type Connection = ElasticsearchConnection;
322
323 fn kind(&self) -> BackendKind {
324 BackendKind::Elasticsearch
325 }
326
327 fn name(&self) -> &'static str {
328 "elasticsearch"
329 }
330
331 fn supports(&self, capability: BackendCapability) -> bool {
332 matches!(
333 capability,
334 BackendCapability::Crud
335 | BackendCapability::BasicSearch
336 | BackendCapability::DateSearch
337 | BackendCapability::QuantitySearch
338 | BackendCapability::ReferenceSearch
339 | BackendCapability::FullTextSearch
340 | BackendCapability::Sorting
341 | BackendCapability::CursorPagination
342 | BackendCapability::OffsetPagination
343 | BackendCapability::Include
344 | BackendCapability::Revinclude
345 | BackendCapability::SharedSchema
346 )
347 }
348
349 fn capabilities(&self) -> Vec<BackendCapability> {
350 vec![
351 BackendCapability::Crud,
352 BackendCapability::BasicSearch,
353 BackendCapability::DateSearch,
354 BackendCapability::QuantitySearch,
355 BackendCapability::ReferenceSearch,
356 BackendCapability::FullTextSearch,
357 BackendCapability::Sorting,
358 BackendCapability::CursorPagination,
359 BackendCapability::OffsetPagination,
360 BackendCapability::Include,
361 BackendCapability::Revinclude,
362 BackendCapability::SharedSchema,
363 ]
364 }
365
366 async fn acquire(&self) -> Result<Self::Connection, BackendError> {
367 Ok(ElasticsearchConnection)
369 }
370
371 async fn release(&self, _conn: Self::Connection) {
372 }
374
375 async fn health_check(&self) -> Result<(), BackendError> {
376 let response = self
377 .client
378 .cluster()
379 .health(elasticsearch::cluster::ClusterHealthParts::None)
380 .send()
381 .await
382 .map_err(|e| BackendError::Unavailable {
383 backend_name: "elasticsearch".to_string(),
384 message: format!("Health check failed: {}", e),
385 })?;
386
387 let status = response.status_code();
388 if !status.is_success() {
389 return Err(BackendError::Unavailable {
390 backend_name: "elasticsearch".to_string(),
391 message: format!("Cluster health returned status {}", status),
392 });
393 }
394
395 let body = response
396 .json::<Value>()
397 .await
398 .map_err(|e| BackendError::Internal {
399 backend_name: "elasticsearch".to_string(),
400 message: format!("Failed to parse health response: {}", e),
401 source: None,
402 })?;
403
404 let cluster_status = body
405 .get("status")
406 .and_then(|s| s.as_str())
407 .unwrap_or("unknown");
408
409 if cluster_status == "red" {
410 return Err(BackendError::Unavailable {
411 backend_name: "elasticsearch".to_string(),
412 message: format!("Cluster status is red: {:?}", body),
413 });
414 }
415
416 Ok(())
417 }
418
419 async fn initialize(&self) -> Result<(), BackendError> {
420 super::schema::create_index_template(self)
422 .await
423 .map_err(|e| BackendError::Internal {
424 backend_name: "elasticsearch".to_string(),
425 message: format!("Failed to create index template: {}", e),
426 source: None,
427 })
428 }
429
430 async fn migrate(&self) -> Result<(), BackendError> {
431 self.initialize().await
433 }
434}
435
436use crate::core::capabilities::{
441 GlobalSearchCapabilities, ResourceSearchCapabilities, SearchCapabilityProvider,
442};
443use crate::types::{
444 IncludeCapability, PaginationCapability, ResultModeCapability, SearchParamFullCapability,
445 SearchParamType, SpecialSearchParam,
446};
447
448impl SearchCapabilityProvider for ElasticsearchBackend {
449 fn resource_search_capabilities(
450 &self,
451 resource_type: &str,
452 ) -> Option<ResourceSearchCapabilities> {
453 let params = {
454 let registry = self.search_registry.read();
455 registry.get_active_params(resource_type)
456 };
457
458 if params.is_empty() {
459 let common_params = {
460 let registry = self.search_registry.read();
461 registry.get_active_params("Resource")
462 };
463 if common_params.is_empty() {
464 return None;
465 }
466 }
467
468 let mut search_params = Vec::new();
469 for param in ¶ms {
470 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
471 .with_definition(¶m.url);
472 let modifiers = Self::modifiers_for_type(param.param_type);
473 cap = cap.with_modifiers(modifiers);
474 if let Some(ref targets) = param.target {
475 cap = cap.with_targets(targets.iter().map(|s| s.as_str()));
476 }
477 search_params.push(cap);
478 }
479
480 let common_params = {
482 let registry = self.search_registry.read();
483 registry.get_active_params("Resource")
484 };
485 for param in &common_params {
486 if !search_params.iter().any(|p| p.name == param.code) {
487 let mut cap = SearchParamFullCapability::new(¶m.code, param.param_type)
488 .with_definition(¶m.url);
489 cap = cap.with_modifiers(Self::modifiers_for_type(param.param_type));
490 search_params.push(cap);
491 }
492 }
493
494 Some(
495 ResourceSearchCapabilities::new(resource_type)
496 .with_special_params(vec![
497 SpecialSearchParam::Id,
498 SpecialSearchParam::LastUpdated,
499 SpecialSearchParam::Tag,
500 SpecialSearchParam::Profile,
501 SpecialSearchParam::Security,
502 SpecialSearchParam::Text,
503 SpecialSearchParam::Content,
504 ])
505 .with_include_capabilities(vec![
506 IncludeCapability::Include,
507 IncludeCapability::Revinclude,
508 ])
509 .with_pagination_capabilities(vec![
510 PaginationCapability::Count,
511 PaginationCapability::Offset,
512 PaginationCapability::Cursor,
513 PaginationCapability::MaxPageSize(1000),
514 PaginationCapability::DefaultPageSize(20),
515 ])
516 .with_result_mode_capabilities(vec![
517 ResultModeCapability::Total,
518 ResultModeCapability::TotalNone,
519 ResultModeCapability::TotalAccurate,
520 ResultModeCapability::SummaryCount,
521 ])
522 .with_param_list(search_params),
523 )
524 }
525
526 fn global_search_capabilities(&self) -> GlobalSearchCapabilities {
527 GlobalSearchCapabilities::new()
528 .with_special_params(vec![
529 SpecialSearchParam::Id,
530 SpecialSearchParam::LastUpdated,
531 SpecialSearchParam::Tag,
532 SpecialSearchParam::Profile,
533 SpecialSearchParam::Security,
534 SpecialSearchParam::Text,
535 SpecialSearchParam::Content,
536 ])
537 .with_pagination(vec![
538 PaginationCapability::Count,
539 PaginationCapability::Offset,
540 PaginationCapability::Cursor,
541 PaginationCapability::MaxPageSize(1000),
542 PaginationCapability::DefaultPageSize(20),
543 ])
544 .with_system_search()
545 }
546}
547
548impl ElasticsearchBackend {
549 fn modifiers_for_type(param_type: SearchParamType) -> Vec<&'static str> {
553 match param_type {
554 SearchParamType::String => vec!["exact", "contains", "text", "missing"],
555 SearchParamType::Token => {
556 vec![
557 "not",
558 "text",
559 "text-advanced",
560 "in",
561 "not-in",
562 "of-type",
563 "missing",
564 ]
565 }
566 SearchParamType::Reference => vec!["identifier", "missing"],
567 SearchParamType::Date => vec!["missing"],
568 SearchParamType::Number => vec!["missing"],
569 SearchParamType::Quantity => vec!["missing"],
570 SearchParamType::Uri => vec!["below", "above", "missing"],
571 SearchParamType::Composite => vec!["missing"],
572 SearchParamType::Special => vec![],
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580
581 #[test]
582 fn test_config_defaults() {
583 let config = ElasticsearchConfig::default();
584 assert_eq!(config.index_prefix, "hfs");
585 assert_eq!(config.number_of_shards, 1);
586 assert_eq!(config.number_of_replicas, 1);
587 assert_eq!(config.nodes, vec!["http://localhost:9200"]);
588 }
589
590 #[test]
591 fn test_index_name() {
592 let config = ElasticsearchConfig::default();
593 let backend = ElasticsearchBackend::new(config).unwrap();
594 assert_eq!(backend.index_name("acme", "Patient"), "hfs_acme_patient");
595 assert_eq!(
596 backend.index_name("ACME", "Observation"),
597 "hfs_acme_observation"
598 );
599 }
600
601 #[test]
602 fn test_document_id() {
603 assert_eq!(
604 ElasticsearchBackend::document_id("Patient", "123"),
605 "Patient_123"
606 );
607 }
608
609 #[test]
610 fn test_backend_capabilities() {
611 let config = ElasticsearchConfig::default();
612 let backend = ElasticsearchBackend::new(config).unwrap();
613
614 assert!(backend.supports(BackendCapability::BasicSearch));
615 assert!(backend.supports(BackendCapability::FullTextSearch));
616 assert!(backend.supports(BackendCapability::CursorPagination));
617 assert!(backend.supports(BackendCapability::Sorting));
618 assert!(!backend.supports(BackendCapability::Transactions));
619 assert!(!backend.supports(BackendCapability::ChainedSearch));
620 }
621
622 #[test]
623 fn test_backend_kind() {
624 let config = ElasticsearchConfig::default();
625 let backend = ElasticsearchBackend::new(config).unwrap();
626 assert_eq!(backend.kind(), BackendKind::Elasticsearch);
627 assert_eq!(backend.name(), "elasticsearch");
628 }
629}