1use async_trait::async_trait;
8use chrono::Utc;
9use elasticsearch::{DeleteParts, GetParts, IndexParts};
10use helios_fhir::FhirVersion;
11use serde_json::{Value, json};
12
13use crate::core::ResourceStorage;
14use crate::error::{BackendError, ResourceError, StorageError, StorageResult};
15use crate::search::converters::IndexValue;
16use crate::search::extractor::ExtractedValue;
17use crate::tenant::TenantContext;
18use crate::types::StoredResource;
19
20use super::backend::ElasticsearchBackend;
21use super::schema;
22
23fn internal_error(message: String) -> StorageError {
24 StorageError::Backend(BackendError::Internal {
25 backend_name: "elasticsearch".to_string(),
26 message,
27 source: None,
28 })
29}
30
31struct SearchableContent {
33 narrative: String,
34 full_content: String,
35}
36
37fn extract_searchable_content(resource: &Value) -> SearchableContent {
39 SearchableContent {
40 narrative: extract_narrative(resource),
41 full_content: extract_all_strings(resource),
42 }
43}
44
45fn extract_narrative(resource: &Value) -> String {
47 resource
48 .get("text")
49 .and_then(|t| t.get("div"))
50 .and_then(|d| d.as_str())
51 .map(strip_html_tags)
52 .unwrap_or_default()
53}
54
55fn strip_html_tags(html: &str) -> String {
57 let mut result = String::with_capacity(html.len());
58 let mut in_tag = false;
59
60 for c in html.chars() {
61 match c {
62 '<' => in_tag = true,
63 '>' if in_tag => {
64 in_tag = false;
65 result.push(' ');
66 }
67 _ if !in_tag => result.push(c),
68 _ => {}
69 }
70 }
71
72 result.split_whitespace().collect::<Vec<_>>().join(" ")
74}
75
76fn extract_all_strings(value: &Value) -> String {
78 let mut parts = Vec::new();
79 collect_strings(value, &mut parts);
80 parts.join(" ")
81}
82
83fn collect_strings(value: &Value, parts: &mut Vec<String>) {
84 match value {
85 Value::String(s) => {
86 if !s.is_empty() {
87 parts.push(s.clone());
88 }
89 }
90 Value::Object(map) => {
91 for (key, val) in map {
92 if key == "div" || key == "data" {
94 continue;
95 }
96 collect_strings(val, parts);
97 }
98 }
99 Value::Array(arr) => {
100 for val in arr {
101 collect_strings(val, parts);
102 }
103 }
104 _ => {}
105 }
106}
107
108pub(crate) fn build_es_document(
110 tenant_id: &str,
111 resource_type: &str,
112 resource_id: &str,
113 version_id: &str,
114 content: &Value,
115 fhir_version: FhirVersion,
116 extracted_values: &[ExtractedValue],
117) -> Value {
118 let searchable = extract_searchable_content(content);
119
120 let mut string_params: Vec<Value> = Vec::new();
121 let mut token_params: Vec<Value> = Vec::new();
122 let mut date_params: Vec<Value> = Vec::new();
123 let mut number_params: Vec<Value> = Vec::new();
124 let mut quantity_params: Vec<Value> = Vec::new();
125 let mut reference_params: Vec<Value> = Vec::new();
126 let mut uri_params: Vec<Value> = Vec::new();
127 let mut composite_params: Vec<Value> = Vec::new();
128
129 for ev in extracted_values {
130 match &ev.value {
131 IndexValue::String(s) => {
132 string_params.push(json!({
133 "name": ev.param_name,
134 "value": s,
135 }));
136 }
137 IndexValue::Token {
138 system,
139 code,
140 display,
141 identifier_type_system,
142 identifier_type_code,
143 } => {
144 let mut token = json!({
145 "name": ev.param_name,
146 "code": code,
147 });
148 if let Some(sys) = system {
149 token["system"] = json!(sys);
150 }
151 if let Some(disp) = display {
152 token["display"] = json!(disp);
153 }
154 if let Some(its) = identifier_type_system {
155 token["identifier_type_system"] = json!(its);
156 }
157 if let Some(itc) = identifier_type_code {
158 token["identifier_type_code"] = json!(itc);
159 }
160 token_params.push(token);
161 }
162 IndexValue::Date { value, precision } => {
163 date_params.push(json!({
164 "name": ev.param_name,
165 "value": value,
166 "precision": format!("{:?}", precision).to_lowercase(),
167 }));
168 }
169 IndexValue::Number(n) => {
170 number_params.push(json!({
171 "name": ev.param_name,
172 "value": n,
173 }));
174 }
175 IndexValue::Quantity {
176 value,
177 unit,
178 system,
179 code,
180 } => {
181 let mut qty = json!({
182 "name": ev.param_name,
183 "value": value,
184 });
185 if let Some(u) = unit {
186 qty["unit"] = json!(u);
187 }
188 if let Some(s) = system {
189 qty["system"] = json!(s);
190 }
191 if let Some(c) = code {
192 qty["code"] = json!(c);
193 }
194 quantity_params.push(qty);
195 }
196 IndexValue::Reference {
197 reference,
198 resource_type: ref_type,
199 resource_id: ref_id,
200 } => {
201 let mut ref_doc = json!({
202 "name": ev.param_name,
203 "reference": reference,
204 });
205 if let Some(rt) = ref_type {
206 ref_doc["resource_type"] = json!(rt);
207 }
208 if let Some(ri) = ref_id {
209 ref_doc["resource_id"] = json!(ri);
210 }
211 reference_params.push(ref_doc);
212 }
213 IndexValue::Uri(u) => {
214 uri_params.push(json!({
215 "name": ev.param_name,
216 "value": u,
217 }));
218 }
219 }
220
221 if let Some(group) = ev.composite_group {
222 composite_params.push(json!({
223 "name": ev.param_name,
224 "group_id": group,
225 }));
226 }
227 }
228
229 json!({
230 "resource_type": resource_type,
231 "resource_id": resource_id,
232 "tenant_id": tenant_id,
233 "version_id": version_id,
234 "last_updated": Utc::now().to_rfc3339(),
235 "fhir_version": fhir_version.as_mime_param(),
236 "is_deleted": false,
237 "content": content,
238 "narrative_text": searchable.narrative,
239 "content_text": searchable.full_content,
240 "search_params": {
241 "string": string_params,
242 "token": token_params,
243 "date": date_params,
244 "number": number_params,
245 "quantity": quantity_params,
246 "reference": reference_params,
247 "uri": uri_params,
248 "composite": composite_params,
249 }
250 })
251}
252
253#[async_trait]
254impl ResourceStorage for ElasticsearchBackend {
255 fn backend_name(&self) -> &'static str {
256 "elasticsearch"
257 }
258
259 async fn create(
260 &self,
261 tenant: &TenantContext,
262 resource_type: &str,
263 resource: Value,
264 fhir_version: FhirVersion,
265 ) -> StorageResult<StoredResource> {
266 let tenant_id = tenant.tenant_id().as_str();
267
268 let id = resource
269 .get("id")
270 .and_then(|v| v.as_str())
271 .map(String::from)
272 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
273
274 let version_id = "1";
275
276 let mut resource = resource;
278 if let Some(obj) = resource.as_object_mut() {
279 obj.insert(
280 "resourceType".to_string(),
281 Value::String(resource_type.to_string()),
282 );
283 obj.insert("id".to_string(), Value::String(id.clone()));
284 }
285
286 let extracted_values = self
288 .search_extractor()
289 .extract(&resource, resource_type)
290 .unwrap_or_default();
291
292 let doc = build_es_document(
294 tenant_id,
295 resource_type,
296 &id,
297 version_id,
298 &resource,
299 fhir_version,
300 &extracted_values,
301 );
302
303 schema::ensure_index(self, tenant_id, resource_type).await?;
305
306 let index = self.index_name(tenant_id, resource_type);
308 let doc_id = Self::document_id(resource_type, &id);
309
310 let response = self
311 .client()
312 .index(IndexParts::IndexId(&index, &doc_id))
313 .body(doc)
314 .send()
315 .await
316 .map_err(|e| internal_error(format!("Failed to index document: {}", e)))?;
317
318 let status = response.status_code();
319 if !status.is_success() {
320 let body = response.text().await.unwrap_or_default();
321 return Err(internal_error(format!(
322 "Failed to index document (status {}): {}",
323 status, body
324 )));
325 }
326
327 let now = Utc::now();
328 Ok(StoredResource::from_storage(
329 resource_type,
330 &id,
331 version_id,
332 tenant.tenant_id().clone(),
333 resource,
334 now,
335 now,
336 None,
337 fhir_version,
338 ))
339 }
340
341 async fn create_or_update(
342 &self,
343 tenant: &TenantContext,
344 resource_type: &str,
345 id: &str,
346 resource: Value,
347 fhir_version: FhirVersion,
348 ) -> StorageResult<(StoredResource, bool)> {
349 let tenant_id = tenant.tenant_id().as_str();
350
351 let index = self.index_name(tenant_id, resource_type);
353 let doc_id = Self::document_id(resource_type, id);
354
355 let existing = self
356 .client()
357 .get(GetParts::IndexId(&index, &doc_id))
358 .send()
359 .await;
360
361 let (version_id, is_new) = match existing {
362 Ok(resp) if resp.status_code().is_success() => {
363 let body = resp.json::<Value>().await.unwrap_or_default();
364 let current_version: u64 = body
365 .get("_source")
366 .and_then(|s| s.get("version_id"))
367 .and_then(|v| v.as_str())
368 .and_then(|v| v.parse().ok())
369 .unwrap_or(0);
370 ((current_version + 1).to_string(), false)
371 }
372 _ => ("1".to_string(), true),
373 };
374
375 let mut resource = resource;
377 if let Some(obj) = resource.as_object_mut() {
378 obj.insert(
379 "resourceType".to_string(),
380 Value::String(resource_type.to_string()),
381 );
382 obj.insert("id".to_string(), Value::String(id.to_string()));
383 }
384
385 let extracted_values = self
387 .search_extractor()
388 .extract(&resource, resource_type)
389 .unwrap_or_default();
390
391 let doc = build_es_document(
392 tenant_id,
393 resource_type,
394 id,
395 &version_id,
396 &resource,
397 fhir_version,
398 &extracted_values,
399 );
400
401 schema::ensure_index(self, tenant_id, resource_type).await?;
403
404 let response = self
405 .client()
406 .index(IndexParts::IndexId(&index, &doc_id))
407 .body(doc)
408 .send()
409 .await
410 .map_err(|e| internal_error(format!("Failed to index document: {}", e)))?;
411
412 let status = response.status_code();
413 if !status.is_success() {
414 let body = response.text().await.unwrap_or_default();
415 return Err(internal_error(format!(
416 "Failed to index document (status {}): {}",
417 status, body
418 )));
419 }
420
421 let now = Utc::now();
422 Ok((
423 StoredResource::from_storage(
424 resource_type,
425 id,
426 &version_id,
427 tenant.tenant_id().clone(),
428 resource,
429 now,
430 now,
431 None,
432 fhir_version,
433 ),
434 is_new,
435 ))
436 }
437
438 async fn read(
439 &self,
440 tenant: &TenantContext,
441 resource_type: &str,
442 id: &str,
443 ) -> StorageResult<Option<StoredResource>> {
444 let tenant_id = tenant.tenant_id().as_str();
445 let index = self.index_name(tenant_id, resource_type);
446 let doc_id = Self::document_id(resource_type, id);
447
448 let response = self
449 .client()
450 .get(GetParts::IndexId(&index, &doc_id))
451 .send()
452 .await;
453
454 let response = match response {
455 Ok(r) => r,
456 Err(_) => return Ok(None),
457 };
458
459 if !response.status_code().is_success() {
460 return Ok(None);
461 }
462
463 let body: Value = response
464 .json()
465 .await
466 .map_err(|e| internal_error(format!("Failed to parse ES response: {}", e)))?;
467
468 let source = match body.get("_source") {
469 Some(s) => s,
470 None => return Ok(None),
471 };
472
473 if source
475 .get("is_deleted")
476 .and_then(|v| v.as_bool())
477 .unwrap_or(false)
478 {
479 return Ok(None);
480 }
481
482 let doc_tenant = source
484 .get("tenant_id")
485 .and_then(|v| v.as_str())
486 .unwrap_or("");
487 if doc_tenant != tenant_id {
488 return Ok(None);
489 }
490
491 parse_stored_resource(source, tenant)
492 }
493
494 async fn update(
495 &self,
496 tenant: &TenantContext,
497 current: &StoredResource,
498 resource: Value,
499 ) -> StorageResult<StoredResource> {
500 let tenant_id = tenant.tenant_id().as_str();
501 let resource_type = current.resource_type();
502 let id = current.id();
503 let new_version: u64 = current.version_id().parse::<u64>().unwrap_or(0) + 1;
504 let version_id = new_version.to_string();
505 let fhir_version = current.fhir_version();
506
507 let mut resource = resource;
508 if let Some(obj) = resource.as_object_mut() {
509 obj.insert(
510 "resourceType".to_string(),
511 Value::String(resource_type.to_string()),
512 );
513 obj.insert("id".to_string(), Value::String(id.to_string()));
514 }
515
516 let extracted_values = self
517 .search_extractor()
518 .extract(&resource, resource_type)
519 .unwrap_or_default();
520
521 let doc = build_es_document(
522 tenant_id,
523 resource_type,
524 id,
525 &version_id,
526 &resource,
527 fhir_version,
528 &extracted_values,
529 );
530
531 schema::ensure_index(self, tenant_id, resource_type).await?;
532
533 let index = self.index_name(tenant_id, resource_type);
534 let doc_id = Self::document_id(resource_type, id);
535
536 let response = self
537 .client()
538 .index(IndexParts::IndexId(&index, &doc_id))
539 .body(doc)
540 .send()
541 .await
542 .map_err(|e| internal_error(format!("Failed to update document: {}", e)))?;
543
544 let status = response.status_code();
545 if !status.is_success() {
546 let body = response.text().await.unwrap_or_default();
547 return Err(internal_error(format!(
548 "Failed to update document (status {}): {}",
549 status, body
550 )));
551 }
552
553 let now = Utc::now();
554 Ok(StoredResource::from_storage(
555 resource_type,
556 id,
557 &version_id,
558 tenant.tenant_id().clone(),
559 resource,
560 now,
561 now,
562 None,
563 fhir_version,
564 ))
565 }
566
567 async fn delete(
568 &self,
569 tenant: &TenantContext,
570 resource_type: &str,
571 id: &str,
572 ) -> StorageResult<()> {
573 let tenant_id = tenant.tenant_id().as_str();
574 let index = self.index_name(tenant_id, resource_type);
575 let doc_id = Self::document_id(resource_type, id);
576
577 let response = self
578 .client()
579 .delete(DeleteParts::IndexId(&index, &doc_id))
580 .send()
581 .await
582 .map_err(|e| internal_error(format!("Failed to delete document: {}", e)))?;
583
584 let status = response.status_code();
585 if !status.is_success() {
586 if status.as_u16() == 404 {
587 return Err(StorageError::Resource(ResourceError::NotFound {
588 resource_type: resource_type.to_string(),
589 id: id.to_string(),
590 }));
591 }
592 let body = response.text().await.unwrap_or_default();
593 return Err(internal_error(format!(
594 "Failed to delete document (status {}): {}",
595 status, body
596 )));
597 }
598
599 Ok(())
600 }
601
602 async fn count(
603 &self,
604 tenant: &TenantContext,
605 resource_type: Option<&str>,
606 ) -> StorageResult<u64> {
607 let tenant_id = tenant.tenant_id().as_str();
608
609 let index_pattern = match resource_type {
610 Some(rt) => self.index_name(tenant_id, rt),
611 None => format!(
612 "{}_{}_*",
613 self.config().index_prefix,
614 tenant_id.to_lowercase()
615 ),
616 };
617
618 let query = json!({
619 "query": {
620 "bool": {
621 "filter": [
622 { "term": { "tenant_id": tenant_id } },
623 { "term": { "is_deleted": false } }
624 ]
625 }
626 }
627 });
628
629 let response = self
630 .client()
631 .count(elasticsearch::CountParts::Index(&[&index_pattern]))
632 .body(query)
633 .send()
634 .await;
635
636 match response {
637 Ok(resp) if resp.status_code().is_success() => {
638 let body: Value = resp.json().await.unwrap_or_default();
639 Ok(body.get("count").and_then(|c| c.as_u64()).unwrap_or(0))
640 }
641 _ => Ok(0),
643 }
644 }
645}
646
647fn parse_stored_resource(
649 source: &Value,
650 tenant: &TenantContext,
651) -> StorageResult<Option<StoredResource>> {
652 let resource_type = source
653 .get("resource_type")
654 .and_then(|v| v.as_str())
655 .ok_or_else(|| internal_error("Missing resource_type in ES document".to_string()))?;
656
657 let resource_id = source
658 .get("resource_id")
659 .and_then(|v| v.as_str())
660 .ok_or_else(|| internal_error("Missing resource_id in ES document".to_string()))?;
661
662 let version_id = source
663 .get("version_id")
664 .and_then(|v| v.as_str())
665 .unwrap_or("1");
666
667 let content = source.get("content").cloned().unwrap_or_else(|| json!({}));
668
669 let fhir_version_str = source
670 .get("fhir_version")
671 .and_then(|v| v.as_str())
672 .unwrap_or("4.0");
673
674 let fhir_version = FhirVersion::from_mime_param(fhir_version_str).unwrap_or_default();
675
676 let last_updated = source
677 .get("last_updated")
678 .and_then(|v| v.as_str())
679 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
680 .map(|dt| dt.with_timezone(&Utc))
681 .unwrap_or_else(Utc::now);
682
683 Ok(Some(StoredResource::from_storage(
684 resource_type,
685 resource_id,
686 version_id,
687 tenant.tenant_id().clone(),
688 content,
689 last_updated,
690 last_updated,
691 None,
692 fhir_version,
693 )))
694}