Skip to main content

helios_persistence/backends/elasticsearch/
storage.rs

1//! ResourceStorage implementation for Elasticsearch.
2//!
3//! Provides the minimal CRUD operations needed for the SyncManager to propagate
4//! changes from the primary backend. The ES backend is primarily a search secondary,
5//! but it must implement ResourceStorage for sync support.
6
7use async_trait::async_trait;
8use chrono::Utc;
9use elasticsearch::{DeleteParts, GetParts, IndexParts};
10use helios_fhir::FhirVersion;
11use serde_json::{Value, json};
12
13use crate::core::ResourceStorage;
14use crate::error::{BackendError, ResourceError, StorageError, StorageResult};
15use crate::search::converters::IndexValue;
16use crate::search::extractor::ExtractedValue;
17use crate::tenant::TenantContext;
18use crate::types::StoredResource;
19
20use super::backend::ElasticsearchBackend;
21use super::schema;
22
23fn internal_error(message: String) -> StorageError {
24    StorageError::Backend(BackendError::Internal {
25        backend_name: "elasticsearch".to_string(),
26        message,
27        source: None,
28    })
29}
30
31/// Content extracted from a resource for full-text search.
32struct SearchableContent {
33    narrative: String,
34    full_content: String,
35}
36
37/// Extracts searchable text content from a FHIR resource.
38fn extract_searchable_content(resource: &Value) -> SearchableContent {
39    SearchableContent {
40        narrative: extract_narrative(resource),
41        full_content: extract_all_strings(resource),
42    }
43}
44
45/// Extracts narrative text from resource.text.div, stripping HTML tags.
46fn extract_narrative(resource: &Value) -> String {
47    resource
48        .get("text")
49        .and_then(|t| t.get("div"))
50        .and_then(|d| d.as_str())
51        .map(strip_html_tags)
52        .unwrap_or_default()
53}
54
55/// Strips HTML tags from a string, returning plain text.
56fn strip_html_tags(html: &str) -> String {
57    let mut result = String::with_capacity(html.len());
58    let mut in_tag = false;
59
60    for c in html.chars() {
61        match c {
62            '<' => in_tag = true,
63            '>' if in_tag => {
64                in_tag = false;
65                result.push(' ');
66            }
67            _ if !in_tag => result.push(c),
68            _ => {}
69        }
70    }
71
72    // Normalize whitespace
73    result.split_whitespace().collect::<Vec<_>>().join(" ")
74}
75
76/// Extracts all string values from a JSON value recursively.
77fn extract_all_strings(value: &Value) -> String {
78    let mut parts = Vec::new();
79    collect_strings(value, &mut parts);
80    parts.join(" ")
81}
82
83fn collect_strings(value: &Value, parts: &mut Vec<String>) {
84    match value {
85        Value::String(s) => {
86            if !s.is_empty() {
87                parts.push(s.clone());
88            }
89        }
90        Value::Object(map) => {
91            for (key, val) in map {
92                // Skip metadata fields and large binary data
93                if key == "div" || key == "data" {
94                    continue;
95                }
96                collect_strings(val, parts);
97            }
98        }
99        Value::Array(arr) => {
100            for val in arr {
101                collect_strings(val, parts);
102            }
103        }
104        _ => {}
105    }
106}
107
108/// Builds an ES document from a FHIR resource and its extracted search values.
109pub(crate) fn build_es_document(
110    tenant_id: &str,
111    resource_type: &str,
112    resource_id: &str,
113    version_id: &str,
114    content: &Value,
115    fhir_version: FhirVersion,
116    extracted_values: &[ExtractedValue],
117) -> Value {
118    let searchable = extract_searchable_content(content);
119
120    let mut string_params: Vec<Value> = Vec::new();
121    let mut token_params: Vec<Value> = Vec::new();
122    let mut date_params: Vec<Value> = Vec::new();
123    let mut number_params: Vec<Value> = Vec::new();
124    let mut quantity_params: Vec<Value> = Vec::new();
125    let mut reference_params: Vec<Value> = Vec::new();
126    let mut uri_params: Vec<Value> = Vec::new();
127    let mut composite_params: Vec<Value> = Vec::new();
128
129    for ev in extracted_values {
130        match &ev.value {
131            IndexValue::String(s) => {
132                string_params.push(json!({
133                    "name": ev.param_name,
134                    "value": s,
135                }));
136            }
137            IndexValue::Token {
138                system,
139                code,
140                display,
141                identifier_type_system,
142                identifier_type_code,
143            } => {
144                let mut token = json!({
145                    "name": ev.param_name,
146                    "code": code,
147                });
148                if let Some(sys) = system {
149                    token["system"] = json!(sys);
150                }
151                if let Some(disp) = display {
152                    token["display"] = json!(disp);
153                }
154                if let Some(its) = identifier_type_system {
155                    token["identifier_type_system"] = json!(its);
156                }
157                if let Some(itc) = identifier_type_code {
158                    token["identifier_type_code"] = json!(itc);
159                }
160                token_params.push(token);
161            }
162            IndexValue::Date { value, precision } => {
163                date_params.push(json!({
164                    "name": ev.param_name,
165                    "value": value,
166                    "precision": format!("{:?}", precision).to_lowercase(),
167                }));
168            }
169            IndexValue::Number(n) => {
170                number_params.push(json!({
171                    "name": ev.param_name,
172                    "value": n,
173                }));
174            }
175            IndexValue::Quantity {
176                value,
177                unit,
178                system,
179                code,
180            } => {
181                let mut qty = json!({
182                    "name": ev.param_name,
183                    "value": value,
184                });
185                if let Some(u) = unit {
186                    qty["unit"] = json!(u);
187                }
188                if let Some(s) = system {
189                    qty["system"] = json!(s);
190                }
191                if let Some(c) = code {
192                    qty["code"] = json!(c);
193                }
194                quantity_params.push(qty);
195            }
196            IndexValue::Reference {
197                reference,
198                resource_type: ref_type,
199                resource_id: ref_id,
200            } => {
201                let mut ref_doc = json!({
202                    "name": ev.param_name,
203                    "reference": reference,
204                });
205                if let Some(rt) = ref_type {
206                    ref_doc["resource_type"] = json!(rt);
207                }
208                if let Some(ri) = ref_id {
209                    ref_doc["resource_id"] = json!(ri);
210                }
211                reference_params.push(ref_doc);
212            }
213            IndexValue::Uri(u) => {
214                uri_params.push(json!({
215                    "name": ev.param_name,
216                    "value": u,
217                }));
218            }
219        }
220
221        if let Some(group) = ev.composite_group {
222            composite_params.push(json!({
223                "name": ev.param_name,
224                "group_id": group,
225            }));
226        }
227    }
228
229    json!({
230        "resource_type": resource_type,
231        "resource_id": resource_id,
232        "tenant_id": tenant_id,
233        "version_id": version_id,
234        "last_updated": Utc::now().to_rfc3339(),
235        "fhir_version": fhir_version.as_mime_param(),
236        "is_deleted": false,
237        "content": content,
238        "narrative_text": searchable.narrative,
239        "content_text": searchable.full_content,
240        "search_params": {
241            "string": string_params,
242            "token": token_params,
243            "date": date_params,
244            "number": number_params,
245            "quantity": quantity_params,
246            "reference": reference_params,
247            "uri": uri_params,
248            "composite": composite_params,
249        }
250    })
251}
252
253#[async_trait]
254impl ResourceStorage for ElasticsearchBackend {
255    fn backend_name(&self) -> &'static str {
256        "elasticsearch"
257    }
258
259    async fn create(
260        &self,
261        tenant: &TenantContext,
262        resource_type: &str,
263        resource: Value,
264        fhir_version: FhirVersion,
265    ) -> StorageResult<StoredResource> {
266        let tenant_id = tenant.tenant_id().as_str();
267
268        let id = resource
269            .get("id")
270            .and_then(|v| v.as_str())
271            .map(String::from)
272            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
273
274        let version_id = "1";
275
276        // Ensure the resource has correct type and id
277        let mut resource = resource;
278        if let Some(obj) = resource.as_object_mut() {
279            obj.insert(
280                "resourceType".to_string(),
281                Value::String(resource_type.to_string()),
282            );
283            obj.insert("id".to_string(), Value::String(id.clone()));
284        }
285
286        // Extract search parameters
287        let extracted_values = self
288            .search_extractor()
289            .extract(&resource, resource_type)
290            .unwrap_or_default();
291
292        // Build ES document
293        let doc = build_es_document(
294            tenant_id,
295            resource_type,
296            &id,
297            version_id,
298            &resource,
299            fhir_version,
300            &extracted_values,
301        );
302
303        // Ensure index exists
304        schema::ensure_index(self, tenant_id, resource_type).await?;
305
306        // Index the document
307        let index = self.index_name(tenant_id, resource_type);
308        let doc_id = Self::document_id(resource_type, &id);
309
310        let response = self
311            .client()
312            .index(IndexParts::IndexId(&index, &doc_id))
313            .body(doc)
314            .send()
315            .await
316            .map_err(|e| internal_error(format!("Failed to index document: {}", e)))?;
317
318        let status = response.status_code();
319        if !status.is_success() {
320            let body = response.text().await.unwrap_or_default();
321            return Err(internal_error(format!(
322                "Failed to index document (status {}): {}",
323                status, body
324            )));
325        }
326
327        let now = Utc::now();
328        Ok(StoredResource::from_storage(
329            resource_type,
330            &id,
331            version_id,
332            tenant.tenant_id().clone(),
333            resource,
334            now,
335            now,
336            None,
337            fhir_version,
338        ))
339    }
340
341    async fn create_or_update(
342        &self,
343        tenant: &TenantContext,
344        resource_type: &str,
345        id: &str,
346        resource: Value,
347        fhir_version: FhirVersion,
348    ) -> StorageResult<(StoredResource, bool)> {
349        let tenant_id = tenant.tenant_id().as_str();
350
351        // Check if document exists
352        let index = self.index_name(tenant_id, resource_type);
353        let doc_id = Self::document_id(resource_type, id);
354
355        let existing = self
356            .client()
357            .get(GetParts::IndexId(&index, &doc_id))
358            .send()
359            .await;
360
361        let (version_id, is_new) = match existing {
362            Ok(resp) if resp.status_code().is_success() => {
363                let body = resp.json::<Value>().await.unwrap_or_default();
364                let current_version: u64 = body
365                    .get("_source")
366                    .and_then(|s| s.get("version_id"))
367                    .and_then(|v| v.as_str())
368                    .and_then(|v| v.parse().ok())
369                    .unwrap_or(0);
370                ((current_version + 1).to_string(), false)
371            }
372            _ => ("1".to_string(), true),
373        };
374
375        // Ensure resource has correct type and id
376        let mut resource = resource;
377        if let Some(obj) = resource.as_object_mut() {
378            obj.insert(
379                "resourceType".to_string(),
380                Value::String(resource_type.to_string()),
381            );
382            obj.insert("id".to_string(), Value::String(id.to_string()));
383        }
384
385        // Extract search parameters
386        let extracted_values = self
387            .search_extractor()
388            .extract(&resource, resource_type)
389            .unwrap_or_default();
390
391        let doc = build_es_document(
392            tenant_id,
393            resource_type,
394            id,
395            &version_id,
396            &resource,
397            fhir_version,
398            &extracted_values,
399        );
400
401        // Ensure index exists
402        schema::ensure_index(self, tenant_id, resource_type).await?;
403
404        let response = self
405            .client()
406            .index(IndexParts::IndexId(&index, &doc_id))
407            .body(doc)
408            .send()
409            .await
410            .map_err(|e| internal_error(format!("Failed to index document: {}", e)))?;
411
412        let status = response.status_code();
413        if !status.is_success() {
414            let body = response.text().await.unwrap_or_default();
415            return Err(internal_error(format!(
416                "Failed to index document (status {}): {}",
417                status, body
418            )));
419        }
420
421        let now = Utc::now();
422        Ok((
423            StoredResource::from_storage(
424                resource_type,
425                id,
426                &version_id,
427                tenant.tenant_id().clone(),
428                resource,
429                now,
430                now,
431                None,
432                fhir_version,
433            ),
434            is_new,
435        ))
436    }
437
438    async fn read(
439        &self,
440        tenant: &TenantContext,
441        resource_type: &str,
442        id: &str,
443    ) -> StorageResult<Option<StoredResource>> {
444        let tenant_id = tenant.tenant_id().as_str();
445        let index = self.index_name(tenant_id, resource_type);
446        let doc_id = Self::document_id(resource_type, id);
447
448        let response = self
449            .client()
450            .get(GetParts::IndexId(&index, &doc_id))
451            .send()
452            .await;
453
454        let response = match response {
455            Ok(r) => r,
456            Err(_) => return Ok(None),
457        };
458
459        if !response.status_code().is_success() {
460            return Ok(None);
461        }
462
463        let body: Value = response
464            .json()
465            .await
466            .map_err(|e| internal_error(format!("Failed to parse ES response: {}", e)))?;
467
468        let source = match body.get("_source") {
469            Some(s) => s,
470            None => return Ok(None),
471        };
472
473        // Check if deleted
474        if source
475            .get("is_deleted")
476            .and_then(|v| v.as_bool())
477            .unwrap_or(false)
478        {
479            return Ok(None);
480        }
481
482        // Verify tenant
483        let doc_tenant = source
484            .get("tenant_id")
485            .and_then(|v| v.as_str())
486            .unwrap_or("");
487        if doc_tenant != tenant_id {
488            return Ok(None);
489        }
490
491        parse_stored_resource(source, tenant)
492    }
493
494    async fn update(
495        &self,
496        tenant: &TenantContext,
497        current: &StoredResource,
498        resource: Value,
499    ) -> StorageResult<StoredResource> {
500        let tenant_id = tenant.tenant_id().as_str();
501        let resource_type = current.resource_type();
502        let id = current.id();
503        let new_version: u64 = current.version_id().parse::<u64>().unwrap_or(0) + 1;
504        let version_id = new_version.to_string();
505        let fhir_version = current.fhir_version();
506
507        let mut resource = resource;
508        if let Some(obj) = resource.as_object_mut() {
509            obj.insert(
510                "resourceType".to_string(),
511                Value::String(resource_type.to_string()),
512            );
513            obj.insert("id".to_string(), Value::String(id.to_string()));
514        }
515
516        let extracted_values = self
517            .search_extractor()
518            .extract(&resource, resource_type)
519            .unwrap_or_default();
520
521        let doc = build_es_document(
522            tenant_id,
523            resource_type,
524            id,
525            &version_id,
526            &resource,
527            fhir_version,
528            &extracted_values,
529        );
530
531        schema::ensure_index(self, tenant_id, resource_type).await?;
532
533        let index = self.index_name(tenant_id, resource_type);
534        let doc_id = Self::document_id(resource_type, id);
535
536        let response = self
537            .client()
538            .index(IndexParts::IndexId(&index, &doc_id))
539            .body(doc)
540            .send()
541            .await
542            .map_err(|e| internal_error(format!("Failed to update document: {}", e)))?;
543
544        let status = response.status_code();
545        if !status.is_success() {
546            let body = response.text().await.unwrap_or_default();
547            return Err(internal_error(format!(
548                "Failed to update document (status {}): {}",
549                status, body
550            )));
551        }
552
553        let now = Utc::now();
554        Ok(StoredResource::from_storage(
555            resource_type,
556            id,
557            &version_id,
558            tenant.tenant_id().clone(),
559            resource,
560            now,
561            now,
562            None,
563            fhir_version,
564        ))
565    }
566
567    async fn delete(
568        &self,
569        tenant: &TenantContext,
570        resource_type: &str,
571        id: &str,
572    ) -> StorageResult<()> {
573        let tenant_id = tenant.tenant_id().as_str();
574        let index = self.index_name(tenant_id, resource_type);
575        let doc_id = Self::document_id(resource_type, id);
576
577        let response = self
578            .client()
579            .delete(DeleteParts::IndexId(&index, &doc_id))
580            .send()
581            .await
582            .map_err(|e| internal_error(format!("Failed to delete document: {}", e)))?;
583
584        let status = response.status_code();
585        if !status.is_success() {
586            if status.as_u16() == 404 {
587                return Err(StorageError::Resource(ResourceError::NotFound {
588                    resource_type: resource_type.to_string(),
589                    id: id.to_string(),
590                }));
591            }
592            let body = response.text().await.unwrap_or_default();
593            return Err(internal_error(format!(
594                "Failed to delete document (status {}): {}",
595                status, body
596            )));
597        }
598
599        Ok(())
600    }
601
602    async fn count(
603        &self,
604        tenant: &TenantContext,
605        resource_type: Option<&str>,
606    ) -> StorageResult<u64> {
607        let tenant_id = tenant.tenant_id().as_str();
608
609        let index_pattern = match resource_type {
610            Some(rt) => self.index_name(tenant_id, rt),
611            None => format!(
612                "{}_{}_*",
613                self.config().index_prefix,
614                tenant_id.to_lowercase()
615            ),
616        };
617
618        let query = json!({
619            "query": {
620                "bool": {
621                    "filter": [
622                        { "term": { "tenant_id": tenant_id } },
623                        { "term": { "is_deleted": false } }
624                    ]
625                }
626            }
627        });
628
629        let response = self
630            .client()
631            .count(elasticsearch::CountParts::Index(&[&index_pattern]))
632            .body(query)
633            .send()
634            .await;
635
636        match response {
637            Ok(resp) if resp.status_code().is_success() => {
638                let body: Value = resp.json().await.unwrap_or_default();
639                Ok(body.get("count").and_then(|c| c.as_u64()).unwrap_or(0))
640            }
641            // If index doesn't exist, count is 0
642            _ => Ok(0),
643        }
644    }
645}
646
647/// Parses a StoredResource from an ES `_source` document.
648fn parse_stored_resource(
649    source: &Value,
650    tenant: &TenantContext,
651) -> StorageResult<Option<StoredResource>> {
652    let resource_type = source
653        .get("resource_type")
654        .and_then(|v| v.as_str())
655        .ok_or_else(|| internal_error("Missing resource_type in ES document".to_string()))?;
656
657    let resource_id = source
658        .get("resource_id")
659        .and_then(|v| v.as_str())
660        .ok_or_else(|| internal_error("Missing resource_id in ES document".to_string()))?;
661
662    let version_id = source
663        .get("version_id")
664        .and_then(|v| v.as_str())
665        .unwrap_or("1");
666
667    let content = source.get("content").cloned().unwrap_or_else(|| json!({}));
668
669    let fhir_version_str = source
670        .get("fhir_version")
671        .and_then(|v| v.as_str())
672        .unwrap_or("4.0");
673
674    let fhir_version = FhirVersion::from_mime_param(fhir_version_str).unwrap_or_default();
675
676    let last_updated = source
677        .get("last_updated")
678        .and_then(|v| v.as_str())
679        .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
680        .map(|dt| dt.with_timezone(&Utc))
681        .unwrap_or_else(Utc::now);
682
683    Ok(Some(StoredResource::from_storage(
684        resource_type,
685        resource_id,
686        version_id,
687        tenant.tenant_id().clone(),
688        content,
689        last_updated,
690        last_updated,
691        None,
692        fhir_version,
693    )))
694}