use async_trait::async_trait;
use elasticsearch::SearchParts;
use serde_json::{Value, json};
use crate::core::ResourceStorage;
use crate::core::search::{
IncludeProvider, RevincludeProvider, SearchProvider, SearchResult, TextSearchProvider,
};
use crate::error::{BackendError, StorageResult};
use crate::tenant::TenantContext;
use crate::types::{
CursorValue, IncludeDirective, Page, PageCursor, PageInfo, Pagination, SearchQuery,
StoredResource,
};
use super::backend::ElasticsearchBackend;
use super::schema;
use super::search::fts;
use super::search::query_builder::{EsQueryBuilder, build_count_query};
fn internal_error(message: String) -> crate::error::StorageError {
crate::error::StorageError::Backend(BackendError::Internal {
backend_name: "elasticsearch".to_string(),
message,
source: None,
})
}
#[async_trait]
impl SearchProvider for ElasticsearchBackend {
async fn search(
&self,
tenant: &TenantContext,
query: &SearchQuery,
) -> StorageResult<SearchResult> {
let tenant_id = tenant.tenant_id().as_str();
let resource_type = &query.resource_type;
let index = self.index_name(tenant_id, resource_type);
let builder = EsQueryBuilder::new(tenant_id, resource_type, index.clone());
let es_query = builder.build(query);
let response = self
.client()
.search(SearchParts::Index(&[&index]))
.body(es_query.body)
.send()
.await;
let response = match response {
Ok(r) => r,
Err(e) => {
tracing::debug!("ES search failed (index may not exist): {}", e);
return Ok(SearchResult::new(Page::new(vec![], PageInfo::end())));
}
};
if !response.status_code().is_success() {
let body = response.text().await.unwrap_or_default();
if body.contains("index_not_found_exception") {
return Ok(SearchResult::new(Page::new(vec![], PageInfo::end())));
}
return Err(internal_error(format!("Search failed: {}", body)));
}
let body: Value = response
.json()
.await
.map_err(|e| internal_error(format!("Failed to parse search response: {}", e)))?;
let hits = body
.get("hits")
.and_then(|h| h.get("hits"))
.and_then(|h| h.as_array())
.cloned()
.unwrap_or_default();
let total = body
.get("hits")
.and_then(|h| h.get("total"))
.and_then(|t| t.get("value"))
.and_then(|v| v.as_u64());
let count = query.count.unwrap_or(20) as usize;
let mut resources = Vec::new();
let mut last_sort: Option<Vec<Value>> = None;
let mut last_resource_id = String::new();
for hit in &hits {
let source = match hit.get("_source") {
Some(s) => s,
None => continue,
};
if source
.get("is_deleted")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
continue;
}
if let Some(stored) = parse_hit_to_stored_resource(source, tenant)? {
last_resource_id = stored.id().to_string();
resources.push(stored);
}
if let Some(sort) = hit.get("sort") {
last_sort = sort.as_array().cloned();
}
}
let has_next = resources.len() >= count;
let next_cursor = if has_next {
last_sort.as_ref().map(|sort_values| {
let cursor_values: Vec<CursorValue> = sort_values
.iter()
.take(sort_values.len().saturating_sub(1)) .map(|v| {
if let Some(s) = v.as_str() {
CursorValue::String(s.to_string())
} else if let Some(n) = v.as_i64() {
CursorValue::Number(n)
} else if let Some(b) = v.as_bool() {
CursorValue::Boolean(b)
} else if v.is_null() {
CursorValue::Null
} else {
CursorValue::String(v.to_string())
}
})
.collect();
PageCursor::new(cursor_values, &last_resource_id).encode()
})
} else {
None
};
let page_info = PageInfo {
next_cursor,
previous_cursor: None,
total,
has_next,
has_previous: query.cursor.is_some() || query.offset.unwrap_or(0) > 0,
};
let page = Page::new(resources, page_info);
let mut result = SearchResult::new(page);
if let Some(t) = total {
result = result.with_total(t);
}
if !query.includes.is_empty() {
let include_directives: Vec<IncludeDirective> = query
.includes
.iter()
.filter(|i| i.include_type == crate::types::IncludeType::Include)
.cloned()
.collect();
if !include_directives.is_empty() {
let included = self
.resolve_includes(tenant, &result.resources.items, &include_directives)
.await?;
result = result.with_included(included);
}
}
Ok(result)
}
async fn search_count(
&self,
tenant: &TenantContext,
query: &SearchQuery,
) -> StorageResult<u64> {
let tenant_id = tenant.tenant_id().as_str();
let resource_type = &query.resource_type;
let index = self.index_name(tenant_id, resource_type);
let count_body = build_count_query(tenant_id, resource_type, query);
let response = self
.client()
.count(elasticsearch::CountParts::Index(&[&index]))
.body(count_body)
.send()
.await;
match response {
Ok(resp) if resp.status_code().is_success() => {
let body: Value = resp.json().await.unwrap_or_default();
Ok(body.get("count").and_then(|c| c.as_u64()).unwrap_or(0))
}
_ => Ok(0),
}
}
}
#[async_trait]
impl TextSearchProvider for ElasticsearchBackend {
async fn search_text(
&self,
tenant: &TenantContext,
resource_type: &str,
text: &str,
pagination: &Pagination,
) -> StorageResult<SearchResult> {
let tenant_id = tenant.tenant_id().as_str();
let index = self.index_name(tenant_id, resource_type);
schema::ensure_index(self, tenant_id, resource_type).await?;
let body = json!({
"query": {
"bool": {
"must": [fts::build_narrative_query(text)],
"filter": [
{ "term": { "tenant_id": tenant_id } },
{ "term": { "is_deleted": false } }
]
}
},
"size": pagination.count,
"track_total_hits": true,
"sort": [
{ "_score": { "order": "desc" } },
{ "resource_id": { "order": "asc" } }
]
});
execute_text_search(self, &index, body, tenant).await
}
async fn search_content(
&self,
tenant: &TenantContext,
resource_type: &str,
content: &str,
pagination: &Pagination,
) -> StorageResult<SearchResult> {
let tenant_id = tenant.tenant_id().as_str();
let index = self.index_name(tenant_id, resource_type);
schema::ensure_index(self, tenant_id, resource_type).await?;
let body = json!({
"query": {
"bool": {
"must": [fts::build_content_query(content)],
"filter": [
{ "term": { "tenant_id": tenant_id } },
{ "term": { "is_deleted": false } }
]
}
},
"size": pagination.count,
"track_total_hits": true,
"sort": [
{ "_score": { "order": "desc" } },
{ "resource_id": { "order": "asc" } }
]
});
execute_text_search(self, &index, body, tenant).await
}
}
async fn execute_text_search(
backend: &ElasticsearchBackend,
index: &str,
body: Value,
tenant: &TenantContext,
) -> StorageResult<SearchResult> {
let response = backend
.client()
.search(SearchParts::Index(&[index]))
.body(body)
.send()
.await;
let response = match response {
Ok(r) => r,
Err(_) => {
return Ok(SearchResult::new(Page::new(vec![], PageInfo::end())));
}
};
if !response.status_code().is_success() {
let body = response.text().await.unwrap_or_default();
if body.contains("index_not_found_exception") {
return Ok(SearchResult::new(Page::new(vec![], PageInfo::end())));
}
return Err(internal_error(format!("Text search failed: {}", body)));
}
let body: Value = response
.json()
.await
.map_err(|e| internal_error(format!("Failed to parse response: {}", e)))?;
let hits = body
.get("hits")
.and_then(|h| h.get("hits"))
.and_then(|h| h.as_array())
.cloned()
.unwrap_or_default();
let total = body
.get("hits")
.and_then(|h| h.get("total"))
.and_then(|t| t.get("value"))
.and_then(|v| v.as_u64());
let mut resources = Vec::new();
for hit in &hits {
if let Some(source) = hit.get("_source") {
if let Some(stored) = parse_hit_to_stored_resource(source, tenant)? {
resources.push(stored);
}
}
}
let page = Page::new(resources, PageInfo::end());
let mut result = SearchResult::new(page);
if let Some(t) = total {
result = result.with_total(t);
}
Ok(result)
}
#[async_trait]
impl IncludeProvider for ElasticsearchBackend {
async fn resolve_includes(
&self,
tenant: &TenantContext,
resources: &[StoredResource],
includes: &[IncludeDirective],
) -> StorageResult<Vec<StoredResource>> {
let mut included = Vec::new();
for directive in includes {
for resource in resources {
let content = resource.content();
let search_param = &directive.search_param;
let references = extract_references(content, search_param);
for (ref_type, ref_id) in references {
if let Some(ref target_type) = directive.target_type {
if ref_type != *target_type {
continue;
}
}
if let Some(stored) = self.read(tenant, &ref_type, &ref_id).await? {
if !included.iter().any(|r: &StoredResource| {
r.resource_type() == stored.resource_type() && r.id() == stored.id()
}) {
included.push(stored);
}
}
}
}
}
Ok(included)
}
}
#[async_trait]
impl RevincludeProvider for ElasticsearchBackend {
async fn resolve_revincludes(
&self,
tenant: &TenantContext,
resources: &[StoredResource],
revincludes: &[IncludeDirective],
) -> StorageResult<Vec<StoredResource>> {
let mut result = Vec::new();
for directive in revincludes {
let source_type = &directive.source_type;
if source_type.is_empty() {
continue;
}
for resource in resources {
let reference_value = format!("{}/{}", resource.resource_type(), resource.id());
let query =
SearchQuery::new(source_type).with_parameter(crate::types::SearchParameter {
name: directive.search_param.clone(),
param_type: crate::types::SearchParamType::Reference,
modifier: None,
values: vec![crate::types::SearchValue::eq(&reference_value)],
chain: vec![],
components: vec![],
});
let search_result = self.search(tenant, &query).await?;
for stored in search_result.resources.items {
if !result.iter().any(|r: &StoredResource| {
r.resource_type() == stored.resource_type() && r.id() == stored.id()
}) {
result.push(stored);
}
}
}
}
Ok(result)
}
}
fn parse_hit_to_stored_resource(
source: &Value,
tenant: &TenantContext,
) -> StorageResult<Option<StoredResource>> {
let resource_type = match source.get("resource_type").and_then(|v| v.as_str()) {
Some(rt) => rt,
None => return Ok(None),
};
let resource_id = match source.get("resource_id").and_then(|v| v.as_str()) {
Some(id) => id,
None => return Ok(None),
};
let version_id = source
.get("version_id")
.and_then(|v| v.as_str())
.unwrap_or("1");
let content = source.get("content").cloned().unwrap_or_else(|| json!({}));
let fhir_version_str = source
.get("fhir_version")
.and_then(|v| v.as_str())
.unwrap_or("4.0");
let fhir_version =
helios_fhir::FhirVersion::from_mime_param(fhir_version_str).unwrap_or_default();
let last_updated = source
.get("last_updated")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now);
Ok(Some(StoredResource::from_storage(
resource_type,
resource_id,
version_id,
tenant.tenant_id().clone(),
content,
last_updated,
last_updated,
None,
fhir_version,
)))
}
fn extract_references(content: &Value, param_name: &str) -> Vec<(String, String)> {
let mut refs = Vec::new();
if let Some(obj) = content.as_object() {
if let Some(ref_value) = obj.get(param_name) {
extract_reference_from_value(ref_value, &mut refs);
}
for (_key, value) in obj {
if let Some(ref_obj) = value.as_object() {
if let Some(reference) = ref_obj.get("reference").and_then(|r| r.as_str()) {
if let Some((rt, id)) = parse_reference_string(reference) {
refs.push((rt, id));
}
}
}
if let Some(arr) = value.as_array() {
for item in arr {
if let Some(ref_obj) = item.as_object() {
if let Some(reference) = ref_obj.get("reference").and_then(|r| r.as_str()) {
if let Some((rt, id)) = parse_reference_string(reference) {
refs.push((rt, id));
}
}
}
}
}
}
}
refs
}
fn extract_reference_from_value(value: &Value, refs: &mut Vec<(String, String)>) {
if let Some(obj) = value.as_object() {
if let Some(reference) = obj.get("reference").and_then(|r| r.as_str()) {
if let Some((rt, id)) = parse_reference_string(reference) {
refs.push((rt, id));
}
}
} else if let Some(arr) = value.as_array() {
for item in arr {
extract_reference_from_value(item, refs);
}
}
}
fn parse_reference_string(reference: &str) -> Option<(String, String)> {
if let Some((type_part, id_part)) = reference.rsplit_once('/') {
let resource_type = type_part.rsplit('/').next().unwrap_or(type_part);
if resource_type
.chars()
.next()
.map(|c| c.is_uppercase())
.unwrap_or(false)
{
return Some((resource_type.to_string(), id_part.to_string()));
}
}
None
}