1use async_trait::async_trait;
4use chrono::Utc;
5use helios_fhir::FhirVersion;
6use rusqlite::{ToSql, params};
7use serde_json::Value;
8
9use crate::core::history::{
10 DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
11 InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
12};
13use crate::core::transaction::{
14 BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
15};
16use crate::core::{
17 ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
18 PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
19};
20use crate::error::TransactionError;
21use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
22use crate::search::extractor::ExtractedValue;
23use crate::search::loader::SearchParameterLoader;
24use crate::search::registry::SearchParameterStatus;
25use crate::search::reindex::{ReindexableStorage, ResourcePage};
26use crate::tenant::TenantContext;
27use crate::types::Pagination;
28use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
29use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
30
31use super::SqliteBackend;
32use super::search::writer::{SqlValue, SqliteSearchIndexWriter};
33
34fn internal_error(message: String) -> StorageError {
35 StorageError::Backend(BackendError::Internal {
36 backend_name: "sqlite".to_string(),
37 message,
38 source: None,
39 })
40}
41
42fn serialization_error(message: String) -> StorageError {
43 StorageError::Backend(BackendError::SerializationError { message })
44}
45
46fn extract_part_value(part: &Value) -> Option<Value> {
52 part.as_object()?.iter().find_map(|(k, v)| {
53 let suffix = k.strip_prefix("value")?;
54 suffix
55 .chars()
56 .next()?
57 .is_ascii_uppercase()
58 .then(|| v.clone())
59 })
60}
61
62#[async_trait]
63impl ResourceStorage for SqliteBackend {
64 fn backend_name(&self) -> &'static str {
65 "sqlite"
66 }
67
68 fn sof_runner(&self) -> Option<std::sync::Arc<dyn crate::core::sof_runner::SofRunner>> {
69 use crate::sof::sqlite::SqliteInDbRunner;
70 Some(std::sync::Arc::new(SqliteInDbRunner::new(self.pool())))
71 }
72
73 async fn create(
74 &self,
75 tenant: &TenantContext,
76 resource_type: &str,
77 resource: Value,
78 fhir_version: FhirVersion,
79 ) -> StorageResult<StoredResource> {
80 let conn = self.get_connection()?;
81 let tenant_id = tenant.tenant_id().as_str();
82
83 let id = resource
85 .get("id")
86 .and_then(|v| v.as_str())
87 .map(String::from)
88 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
89
90 let exists: bool = conn
92 .query_row(
93 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
94 params![tenant_id, resource_type, id],
95 |_| Ok(true),
96 )
97 .unwrap_or(false);
98
99 if exists {
100 return Err(StorageError::Resource(ResourceError::AlreadyExists {
101 resource_type: resource_type.to_string(),
102 id: id.clone(),
103 }));
104 }
105
106 let mut resource = resource;
108 if let Some(obj) = resource.as_object_mut() {
109 obj.insert(
110 "resourceType".to_string(),
111 Value::String(resource_type.to_string()),
112 );
113 obj.insert("id".to_string(), Value::String(id.clone()));
114 }
115
116 let data = serde_json::to_vec(&resource)
118 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
119
120 let now = Utc::now();
121 let last_updated = now.to_rfc3339();
122 let version_id = "1";
123 let fhir_version_str = fhir_version.as_mime_param();
124
125 conn.execute(
127 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
128 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
129 params![tenant_id, resource_type, id, version_id, data, last_updated, fhir_version_str],
130 )
131 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
132
133 conn.execute(
135 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
136 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
137 params![tenant_id, resource_type, id, version_id, data, last_updated, fhir_version_str],
138 )
139 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
140
141 self.index_resource(&conn, tenant_id, resource_type, &id, &resource)?;
143
144 if resource_type == "SearchParameter" {
146 self.handle_search_parameter_create(&resource)?;
147 }
148
149 Ok(StoredResource::from_storage(
151 resource_type,
152 &id,
153 version_id,
154 tenant.tenant_id().clone(),
155 resource,
156 now,
157 now,
158 None,
159 fhir_version,
160 ))
161 }
162
163 async fn create_or_update(
164 &self,
165 tenant: &TenantContext,
166 resource_type: &str,
167 id: &str,
168 resource: Value,
169 fhir_version: FhirVersion,
170 ) -> StorageResult<(StoredResource, bool)> {
171 let existing = self.read(tenant, resource_type, id).await?;
173
174 if let Some(current) = existing {
175 let updated = self.update(tenant, ¤t, resource).await?;
177 Ok((updated, false))
178 } else {
179 let mut resource = resource;
181 if let Some(obj) = resource.as_object_mut() {
182 obj.insert("id".to_string(), Value::String(id.to_string()));
183 }
184 let created = self
185 .create(tenant, resource_type, resource, fhir_version)
186 .await?;
187 Ok((created, true))
188 }
189 }
190
191 async fn read(
192 &self,
193 tenant: &TenantContext,
194 resource_type: &str,
195 id: &str,
196 ) -> StorageResult<Option<StoredResource>> {
197 let conn = self.get_connection()?;
198 let tenant_id = tenant.tenant_id().as_str();
199
200 let result = conn.query_row(
201 "SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
202 FROM resources
203 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
204 params![tenant_id, resource_type, id],
205 |row| {
206 let version_id: String = row.get(0)?;
207 let data: Vec<u8> = row.get(1)?;
208 let last_updated: String = row.get(2)?;
209 let is_deleted: i32 = row.get(3)?;
210 let deleted_at: Option<String> = row.get(4)?;
211 let fhir_version: String = row.get(5)?;
212 Ok((
213 version_id,
214 data,
215 last_updated,
216 is_deleted,
217 deleted_at,
218 fhir_version,
219 ))
220 },
221 );
222
223 match result {
224 Ok((version_id, data, last_updated, is_deleted, deleted_at, fhir_version_str)) => {
225 if is_deleted != 0 {
227 let deleted_at = deleted_at.and_then(|s| {
228 chrono::DateTime::parse_from_rfc3339(&s)
229 .ok()
230 .map(|dt| dt.with_timezone(&Utc))
231 });
232 return Err(StorageError::Resource(ResourceError::Gone {
233 resource_type: resource_type.to_string(),
234 id: id.to_string(),
235 deleted_at,
236 }));
237 }
238
239 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
240 serialization_error(format!("Failed to deserialize resource: {}", e))
241 })?;
242
243 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
244 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
245 .with_timezone(&Utc);
246
247 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
249 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
250
251 Ok(Some(StoredResource::from_storage(
252 resource_type,
253 id,
254 version_id,
255 tenant.tenant_id().clone(),
256 json_data,
257 last_updated,
258 last_updated,
259 None,
260 fhir_version,
261 )))
262 }
263 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
264 Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
265 }
266 }
267
268 async fn update(
269 &self,
270 tenant: &TenantContext,
271 current: &StoredResource,
272 resource: Value,
273 ) -> StorageResult<StoredResource> {
274 let conn = self.get_connection()?;
275 let tenant_id = tenant.tenant_id().as_str();
276 let resource_type = current.resource_type();
277 let id = current.id();
278
279 let actual_version: Result<String, _> = conn.query_row(
281 "SELECT version_id FROM resources
282 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
283 params![tenant_id, resource_type, id],
284 |row| row.get(0),
285 );
286
287 let actual_version = match actual_version {
288 Ok(v) => v,
289 Err(rusqlite::Error::QueryReturnedNoRows) => {
290 return Err(StorageError::Resource(ResourceError::NotFound {
291 resource_type: resource_type.to_string(),
292 id: id.to_string(),
293 }));
294 }
295 Err(e) => {
296 return Err(internal_error(format!(
297 "Failed to get current version: {}",
298 e
299 )));
300 }
301 };
302
303 if actual_version != current.version_id() {
305 return Err(StorageError::Concurrency(
306 ConcurrencyError::VersionConflict {
307 resource_type: resource_type.to_string(),
308 id: id.to_string(),
309 expected_version: current.version_id().to_string(),
310 actual_version,
311 },
312 ));
313 }
314
315 let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
317 let new_version_str = new_version.to_string();
318
319 let mut resource = resource;
321 if let Some(obj) = resource.as_object_mut() {
322 obj.insert(
323 "resourceType".to_string(),
324 Value::String(resource_type.to_string()),
325 );
326 obj.insert("id".to_string(), Value::String(id.to_string()));
327 }
328
329 let data = serde_json::to_vec(&resource)
331 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
332
333 let now = Utc::now();
334 let last_updated = now.to_rfc3339();
335
336 conn.execute(
338 "UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
339 WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
340 params![
341 new_version_str,
342 data,
343 last_updated,
344 tenant_id,
345 resource_type,
346 id
347 ],
348 )
349 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
350
351 let fhir_version_str = current.fhir_version().as_mime_param();
353 conn.execute(
354 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
355 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
356 params![tenant_id, resource_type, id, new_version_str, data, last_updated, fhir_version_str],
357 )
358 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
359
360 self.delete_search_index(&conn, tenant_id, resource_type, id)?;
362 self.index_resource(&conn, tenant_id, resource_type, id, &resource)?;
363
364 if resource_type == "SearchParameter" {
366 self.handle_search_parameter_update(current.content(), &resource)?;
367 }
368
369 Ok(StoredResource::from_storage(
370 resource_type,
371 id,
372 new_version_str,
373 tenant.tenant_id().clone(),
374 resource,
375 now,
376 now,
377 None,
378 current.fhir_version(),
379 ))
380 }
381
382 async fn delete(
383 &self,
384 tenant: &TenantContext,
385 resource_type: &str,
386 id: &str,
387 ) -> StorageResult<()> {
388 let conn = self.get_connection()?;
389 let tenant_id = tenant.tenant_id().as_str();
390
391 let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
393 "SELECT version_id, data, fhir_version FROM resources
394 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
395 params![tenant_id, resource_type, id],
396 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
397 );
398
399 let (current_version, data, fhir_version_str) = match result {
400 Ok(v) => v,
401 Err(rusqlite::Error::QueryReturnedNoRows) => {
402 return Err(StorageError::Resource(ResourceError::NotFound {
403 resource_type: resource_type.to_string(),
404 id: id.to_string(),
405 }));
406 }
407 Err(e) => {
408 return Err(internal_error(format!("Failed to check resource: {}", e)));
409 }
410 };
411
412 let now = Utc::now();
413 let deleted_at = now.to_rfc3339();
414
415 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
417 let new_version_str = new_version.to_string();
418
419 conn.execute(
421 "UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
422 WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
423 params![deleted_at, new_version_str, tenant_id, resource_type, id],
424 )
425 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
426
427 conn.execute(
429 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
430 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
431 params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
432 )
433 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
434
435 if !self.is_search_offloaded() {
437 conn.execute(
438 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
439 params![tenant_id, resource_type, id],
440 )
441 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
442 }
443
444 if resource_type == "SearchParameter" {
446 if let Ok(resource_json) = serde_json::from_slice::<Value>(&data) {
447 self.handle_search_parameter_delete(&resource_json)?;
448 }
449 }
450
451 Ok(())
452 }
453
454 async fn count(
455 &self,
456 tenant: &TenantContext,
457 resource_type: Option<&str>,
458 ) -> StorageResult<u64> {
459 let conn = self.get_connection()?;
460 let tenant_id = tenant.tenant_id().as_str();
461
462 let count: i64 = if let Some(rt) = resource_type {
463 conn.query_row(
464 "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0",
465 params![tenant_id, rt],
466 |row| row.get(0),
467 )
468 } else {
469 conn.query_row(
470 "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND is_deleted = 0",
471 params![tenant_id],
472 |row| row.get(0),
473 )
474 }
475 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
476
477 Ok(count as u64)
478 }
479}
480
481impl SqliteBackend {
483 pub(crate) fn index_resource(
490 &self,
491 conn: &rusqlite::Connection,
492 tenant_id: &str,
493 resource_type: &str,
494 resource_id: &str,
495 resource: &Value,
496 ) -> StorageResult<()> {
497 if self.is_search_offloaded() {
499 return Ok(());
500 }
501
502 match self.index_resource_dynamic(conn, tenant_id, resource_type, resource_id, resource) {
504 Ok(count) => {
505 tracing::debug!(
506 "Dynamically indexed {} values for {}/{}",
507 count,
508 resource_type,
509 resource_id
510 );
511 }
512 Err(e) => {
513 tracing::warn!(
514 "Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
515 resource_type,
516 resource_id,
517 e
518 );
519 self.index_minimal_fallback(conn, tenant_id, resource_type, resource_id, resource)?;
521 }
522 }
523
524 self.index_fts_content(conn, tenant_id, resource_type, resource_id, resource)?;
526
527 Ok(())
528 }
529
530 fn index_fts_content(
534 &self,
535 conn: &rusqlite::Connection,
536 tenant_id: &str,
537 resource_type: &str,
538 resource_id: &str,
539 resource: &Value,
540 ) -> StorageResult<()> {
541 use super::search::fts::extract_searchable_content;
542
543 let fts_exists: bool = conn
545 .query_row(
546 "SELECT 1 FROM sqlite_master WHERE type='table' AND name='resource_fts'",
547 [],
548 |_| Ok(true),
549 )
550 .unwrap_or(false);
551
552 if !fts_exists {
553 return Ok(());
555 }
556
557 let content = extract_searchable_content(resource);
559
560 if content.is_empty() {
561 return Ok(());
562 }
563
564 conn.execute(
566 "INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
567 VALUES (?1, ?2, ?3, ?4, ?5)",
568 params![
569 resource_id,
570 resource_type,
571 tenant_id,
572 content.narrative,
573 content.full_content
574 ],
575 )
576 .map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
577
578 Ok(())
579 }
580
581 fn index_resource_dynamic(
585 &self,
586 conn: &rusqlite::Connection,
587 tenant_id: &str,
588 resource_type: &str,
589 resource_id: &str,
590 resource: &Value,
591 ) -> StorageResult<usize> {
592 let values = self
594 .search_extractor()
595 .extract(resource, resource_type)
596 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
597
598 let mut count = 0;
599 for value in values {
600 self.write_index_entry(conn, tenant_id, resource_type, resource_id, &value)?;
601 count += 1;
602 }
603
604 count +=
606 self.index_contained_resources(conn, tenant_id, resource_type, resource_id, resource)?;
607
608 Ok(count)
609 }
610
611 fn write_index_entry(
613 &self,
614 conn: &rusqlite::Connection,
615 tenant_id: &str,
616 resource_type: &str,
617 resource_id: &str,
618 value: &ExtractedValue,
619 ) -> StorageResult<()> {
620 use crate::search::converters::IndexValue;
621
622 let normalized_value = match &value.value {
624 IndexValue::Date {
625 value: date_str,
626 precision,
627 } => {
628 let normalized_date = Self::normalize_date_for_sqlite(date_str);
629 let mut normalized = value.clone();
630 normalized.value = IndexValue::Date {
631 value: normalized_date,
632 precision: *precision,
633 };
634 Some(normalized)
635 }
636 _ => None,
637 };
638
639 let value_to_use = normalized_value.as_ref().unwrap_or(value);
640 let sql_params = SqliteSearchIndexWriter::to_sql_params(
641 tenant_id,
642 resource_type,
643 resource_id,
644 value_to_use,
645 );
646
647 let param_refs: Vec<&dyn ToSql> = sql_params
649 .iter()
650 .map(|p| self.sql_value_to_ref(p))
651 .collect();
652
653 conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
654 .map_err(|e| internal_error(format!("Failed to insert search index entry: {}", e)))?;
655
656 Ok(())
657 }
658
659 fn index_contained_resources(
664 &self,
665 conn: &rusqlite::Connection,
666 tenant_id: &str,
667 container_type: &str,
668 container_id: &str,
669 resource: &Value,
670 ) -> StorageResult<usize> {
671 let mut count = 0;
672 let container = (container_type, container_id);
673 for contained in self.search_extractor().extract_contained(resource) {
674 for value in &contained.values {
675 self.write_contained_index_entry(
676 conn,
677 tenant_id,
678 container,
679 (&contained.contained_type, &contained.local_id),
680 value,
681 )?;
682 count += 1;
683 }
684 }
685 Ok(count)
686 }
687
688 fn write_contained_index_entry(
694 &self,
695 conn: &rusqlite::Connection,
696 tenant_id: &str,
697 container: (&str, &str),
698 contained: (&str, &str),
699 value: &ExtractedValue,
700 ) -> StorageResult<()> {
701 use crate::search::converters::IndexValue;
702
703 let (container_type, container_id) = container;
704 let (contained_type, contained_local_id) = contained;
705
706 let normalized_value = match &value.value {
707 IndexValue::Date {
708 value: date_str,
709 precision,
710 } => {
711 let mut normalized = value.clone();
712 normalized.value = IndexValue::Date {
713 value: Self::normalize_date_for_sqlite(date_str),
714 precision: *precision,
715 };
716 Some(normalized)
717 }
718 _ => None,
719 };
720 let value_to_use = normalized_value.as_ref().unwrap_or(value);
721
722 let mut sql_params = SqliteSearchIndexWriter::to_sql_params(
723 tenant_id,
724 container_type,
725 container_id,
726 value_to_use,
727 );
728 sql_params.push(SqlValue::Int(1));
730 sql_params.push(SqlValue::String(contained_type.to_string()));
731 sql_params.push(SqlValue::String(contained_local_id.to_string()));
732
733 let param_refs: Vec<&dyn ToSql> = sql_params
734 .iter()
735 .map(|p| self.sql_value_to_ref(p))
736 .collect();
737
738 conn.execute(
739 SqliteSearchIndexWriter::insert_contained_sql(),
740 param_refs.as_slice(),
741 )
742 .map_err(|e| {
743 internal_error(format!(
744 "Failed to insert contained search index entry: {}",
745 e
746 ))
747 })?;
748
749 Ok(())
750 }
751
752 fn normalize_date_for_sqlite(value: &str) -> String {
756 if value.contains('T') {
757 value.to_string()
758 } else if value.len() == 10 {
759 format!("{}T00:00:00", value)
761 } else if value.len() == 7 {
762 format!("{}-01T00:00:00", value)
764 } else if value.len() == 4 {
765 format!("{}-01-01T00:00:00", value)
767 } else {
768 value.to_string()
769 }
770 }
771
772 fn sql_value_to_ref<'a>(&'a self, value: &'a super::search::writer::SqlValue) -> &'a dyn ToSql {
774 use super::search::writer::SqlValue;
775 match value {
776 SqlValue::String(s) => s,
777 SqlValue::OptString(opt) => opt,
778 SqlValue::Int(i) => i,
779 SqlValue::OptInt(opt) => opt,
780 SqlValue::Float(f) => f,
781 SqlValue::Null => &rusqlite::types::Null,
782 }
783 }
784
785 pub(crate) fn delete_search_index(
787 &self,
788 conn: &rusqlite::Connection,
789 tenant_id: &str,
790 resource_type: &str,
791 resource_id: &str,
792 ) -> StorageResult<()> {
793 if self.is_search_offloaded() {
795 return Ok(());
796 }
797
798 conn.execute(
800 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
801 params![tenant_id, resource_type, resource_id],
802 )
803 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
804
805 let _ = conn.execute(
807 "DELETE FROM resource_fts WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
808 params![tenant_id, resource_type, resource_id],
809 );
810
811 Ok(())
812 }
813
814 fn index_minimal_fallback(
820 &self,
821 conn: &rusqlite::Connection,
822 tenant_id: &str,
823 resource_type: &str,
824 resource_id: &str,
825 resource: &Value,
826 ) -> StorageResult<()> {
827 if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
829 self.insert_token_index(conn, tenant_id, resource_type, resource_id, "_id", None, id)?;
830 }
831
832 if let Some(last_updated) = resource
834 .get("meta")
835 .and_then(|m| m.get("lastUpdated"))
836 .and_then(|v| v.as_str())
837 {
838 self.insert_date_index(
839 conn,
840 tenant_id,
841 resource_type,
842 resource_id,
843 "_lastUpdated",
844 last_updated,
845 )?;
846 }
847
848 Ok(())
849 }
850
851 #[allow(clippy::too_many_arguments)]
853 fn insert_token_index(
854 &self,
855 conn: &rusqlite::Connection,
856 tenant_id: &str,
857 resource_type: &str,
858 resource_id: &str,
859 param_name: &str,
860 system: Option<&str>,
861 code: &str,
862 ) -> StorageResult<()> {
863 conn.execute(
864 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_system, value_token_code)
865 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
866 params![tenant_id, resource_type, resource_id, param_name, system, code],
867 )
868 .map_err(|e| internal_error(format!("Failed to insert token index: {}", e)))?;
869 Ok(())
870 }
871
872 fn insert_date_index(
874 &self,
875 conn: &rusqlite::Connection,
876 tenant_id: &str,
877 resource_type: &str,
878 resource_id: &str,
879 param_name: &str,
880 value: &str,
881 ) -> StorageResult<()> {
882 let normalized = if value.contains('T') {
885 value.to_string()
886 } else if value.len() == 10 {
887 format!("{}T00:00:00", value)
889 } else if value.len() == 7 {
890 format!("{}-01T00:00:00", value)
892 } else if value.len() == 4 {
893 format!("{}-01-01T00:00:00", value)
895 } else {
896 value.to_string()
897 };
898
899 conn.execute(
900 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
901 VALUES (?1, ?2, ?3, ?4, ?5)",
902 params![tenant_id, resource_type, resource_id, param_name, normalized],
903 )
904 .map_err(|e| internal_error(format!("Failed to insert date index: {}", e)))?;
905 Ok(())
906 }
907}
908
909impl SqliteBackend {
911 fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
917 let loader = SearchParameterLoader::new(FhirVersion::default_enabled());
918
919 match loader.parse_resource(resource) {
920 Ok(def) => {
921 if def.status == SearchParameterStatus::Active {
923 let mut registry = self.search_registry().write();
924 if let Err(e) = registry.register(def) {
926 tracing::debug!("SearchParameter registration skipped: {}", e);
927 }
928 }
929 }
930 Err(e) => {
931 tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
933 }
934 }
935
936 Ok(())
937 }
938
939 fn handle_search_parameter_update(
946 &self,
947 old_resource: &Value,
948 new_resource: &Value,
949 ) -> StorageResult<()> {
950 let loader = SearchParameterLoader::new(FhirVersion::default_enabled());
951
952 let old_def = loader.parse_resource(old_resource).ok();
953 let new_def = loader.parse_resource(new_resource).ok();
954
955 match (old_def, new_def) {
956 (Some(old), Some(new)) => {
957 let mut registry = self.search_registry().write();
958
959 if old.url != new.url {
961 let _ = registry.unregister(&old.url);
962 if new.status == SearchParameterStatus::Active {
963 let _ = registry.register(new);
964 }
965 } else if old.status != new.status {
966 if let Err(e) = registry.update_status(&new.url, new.status) {
968 tracing::debug!("SearchParameter status update skipped: {}", e);
969 }
970 } else {
971 let _ = registry.unregister(&old.url);
973 if new.status == SearchParameterStatus::Active {
974 let _ = registry.register(new);
975 }
976 }
977 }
978 (None, Some(new)) => {
979 if new.status == SearchParameterStatus::Active {
981 let mut registry = self.search_registry().write();
982 let _ = registry.register(new);
983 }
984 }
985 (Some(old), None) => {
986 let mut registry = self.search_registry().write();
988 let _ = registry.unregister(&old.url);
989 }
990 (None, None) => {
991 }
993 }
994
995 Ok(())
996 }
997
998 fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
1003 if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
1004 let mut registry = self.search_registry().write();
1005 if let Err(e) = registry.unregister(url) {
1006 tracing::debug!("SearchParameter unregistration skipped: {}", e);
1007 }
1008 }
1009
1010 Ok(())
1011 }
1012}
1013
1014#[async_trait]
1015impl VersionedStorage for SqliteBackend {
1016 async fn vread(
1017 &self,
1018 tenant: &TenantContext,
1019 resource_type: &str,
1020 id: &str,
1021 version_id: &str,
1022 ) -> StorageResult<Option<StoredResource>> {
1023 let conn = self.get_connection()?;
1024 let tenant_id = tenant.tenant_id().as_str();
1025
1026 let result = conn.query_row(
1027 "SELECT data, last_updated, is_deleted, fhir_version
1028 FROM resource_history
1029 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
1030 params![tenant_id, resource_type, id, version_id],
1031 |row| {
1032 let data: Vec<u8> = row.get(0)?;
1033 let last_updated: String = row.get(1)?;
1034 let is_deleted: i32 = row.get(2)?;
1035 let fhir_version: String = row.get(3)?;
1036 Ok((data, last_updated, is_deleted, fhir_version))
1037 },
1038 );
1039
1040 match result {
1041 Ok((data, last_updated, is_deleted, fhir_version_str)) => {
1042 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1043 serialization_error(format!("Failed to deserialize resource: {}", e))
1044 })?;
1045
1046 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
1047 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1048 .with_timezone(&Utc);
1049
1050 let deleted_at = if is_deleted != 0 {
1052 Some(last_updated)
1053 } else {
1054 None
1055 };
1056
1057 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1058 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1059
1060 Ok(Some(StoredResource::from_storage(
1061 resource_type,
1062 id,
1063 version_id,
1064 tenant.tenant_id().clone(),
1065 json_data,
1066 last_updated,
1067 last_updated,
1068 deleted_at,
1069 fhir_version,
1070 )))
1071 }
1072 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1073 Err(e) => Err(internal_error(format!("Failed to read version: {}", e))),
1074 }
1075 }
1076
1077 async fn update_with_match(
1078 &self,
1079 tenant: &TenantContext,
1080 resource_type: &str,
1081 id: &str,
1082 expected_version: &str,
1083 resource: Value,
1084 ) -> StorageResult<StoredResource> {
1085 let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
1087 StorageError::Resource(ResourceError::NotFound {
1088 resource_type: resource_type.to_string(),
1089 id: id.to_string(),
1090 })
1091 })?;
1092
1093 if current.version_id() != expected_version {
1095 return Err(StorageError::Concurrency(
1096 ConcurrencyError::VersionConflict {
1097 resource_type: resource_type.to_string(),
1098 id: id.to_string(),
1099 expected_version: expected_version.to_string(),
1100 actual_version: current.version_id().to_string(),
1101 },
1102 ));
1103 }
1104
1105 self.update(tenant, ¤t, resource).await
1107 }
1108
1109 async fn delete_with_match(
1110 &self,
1111 tenant: &TenantContext,
1112 resource_type: &str,
1113 id: &str,
1114 expected_version: &str,
1115 ) -> StorageResult<()> {
1116 let conn = self.get_connection()?;
1117 let tenant_id = tenant.tenant_id().as_str();
1118
1119 let current_version: Result<String, _> = conn.query_row(
1121 "SELECT version_id FROM resources
1122 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
1123 params![tenant_id, resource_type, id],
1124 |row| row.get(0),
1125 );
1126
1127 let current_version = match current_version {
1128 Ok(v) => v,
1129 Err(rusqlite::Error::QueryReturnedNoRows) => {
1130 return Err(StorageError::Resource(ResourceError::NotFound {
1131 resource_type: resource_type.to_string(),
1132 id: id.to_string(),
1133 }));
1134 }
1135 Err(e) => {
1136 return Err(internal_error(format!(
1137 "Failed to get current version: {}",
1138 e
1139 )));
1140 }
1141 };
1142
1143 if current_version != expected_version {
1144 return Err(StorageError::Concurrency(
1145 ConcurrencyError::VersionConflict {
1146 resource_type: resource_type.to_string(),
1147 id: id.to_string(),
1148 expected_version: expected_version.to_string(),
1149 actual_version: current_version,
1150 },
1151 ));
1152 }
1153
1154 self.delete(tenant, resource_type, id).await
1156 }
1157
1158 async fn list_versions(
1159 &self,
1160 tenant: &TenantContext,
1161 resource_type: &str,
1162 id: &str,
1163 ) -> StorageResult<Vec<String>> {
1164 let conn = self.get_connection()?;
1165 let tenant_id = tenant.tenant_id().as_str();
1166
1167 let mut stmt = conn
1168 .prepare(
1169 "SELECT version_id FROM resource_history
1170 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3
1171 ORDER BY CAST(version_id AS INTEGER) ASC",
1172 )
1173 .map_err(|e| internal_error(format!("Failed to prepare query: {}", e)))?;
1174
1175 let versions = stmt
1176 .query_map(params![tenant_id, resource_type, id], |row| row.get(0))
1177 .map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?
1178 .filter_map(|r| r.ok())
1179 .collect();
1180
1181 Ok(versions)
1182 }
1183}
1184
1185#[async_trait]
1186impl InstanceHistoryProvider for SqliteBackend {
1187 async fn history_instance(
1188 &self,
1189 tenant: &TenantContext,
1190 resource_type: &str,
1191 id: &str,
1192 params: &HistoryParams,
1193 ) -> StorageResult<HistoryPage> {
1194 let conn = self.get_connection()?;
1195 let tenant_id = tenant.tenant_id().as_str();
1196
1197 let mut sql = String::from(
1199 "SELECT version_id, data, last_updated, is_deleted, fhir_version
1200 FROM resource_history
1201 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1202 );
1203
1204 if !params.include_deleted {
1206 sql.push_str(" AND is_deleted = 0");
1207 }
1208
1209 if let Some(since) = ¶ms.since {
1211 sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
1212 }
1213
1214 if let Some(before) = ¶ms.before {
1216 sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
1217 }
1218
1219 if let Some(cursor) = params.pagination.cursor_value() {
1221 if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
1223 sql.push_str(&format!(
1225 " AND CAST(version_id AS INTEGER) < {}",
1226 version_str.parse::<i64>().unwrap_or(i64::MAX)
1227 ));
1228 }
1229 }
1230
1231 sql.push_str(" ORDER BY CAST(version_id AS INTEGER) DESC");
1233 sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1)); let mut stmt = conn
1236 .prepare(&sql)
1237 .map_err(|e| internal_error(format!("Failed to prepare history query: {}", e)))?;
1238
1239 let rows = stmt
1240 .query_map(params![tenant_id, resource_type, id], |row| {
1241 let version_id: String = row.get(0)?;
1242 let data: Vec<u8> = row.get(1)?;
1243 let last_updated: String = row.get(2)?;
1244 let is_deleted: i32 = row.get(3)?;
1245 let fhir_version: String = row.get(4)?;
1246 Ok((version_id, data, last_updated, is_deleted, fhir_version))
1247 })
1248 .map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
1249
1250 let mut entries = Vec::new();
1251 let mut last_version: Option<String> = None;
1252
1253 for row in rows {
1254 let (version_id, data, last_updated_str, is_deleted, fhir_version_str) =
1255 row.map_err(|e| internal_error(format!("Failed to read history row: {}", e)))?;
1256
1257 if entries.len() >= params.pagination.count as usize {
1259 break;
1260 }
1261
1262 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1263 serialization_error(format!("Failed to deserialize resource: {}", e))
1264 })?;
1265
1266 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1267 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1268 .with_timezone(&Utc);
1269
1270 let deleted_at = if is_deleted != 0 {
1271 Some(last_updated)
1272 } else {
1273 None
1274 };
1275
1276 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1277 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1278
1279 let resource = StoredResource::from_storage(
1280 resource_type,
1281 id,
1282 &version_id,
1283 tenant.tenant_id().clone(),
1284 json_data,
1285 last_updated,
1286 last_updated,
1287 deleted_at,
1288 fhir_version,
1289 );
1290
1291 let method = if is_deleted != 0 {
1293 HistoryMethod::Delete
1294 } else if version_id == "1" {
1295 HistoryMethod::Post
1296 } else {
1297 HistoryMethod::Put
1298 };
1299
1300 last_version = Some(version_id);
1301
1302 entries.push(HistoryEntry {
1303 resource,
1304 method,
1305 timestamp: last_updated,
1306 });
1307 }
1308
1309 let has_more = stmt
1311 .query_map(params![tenant_id, resource_type, id], |_| Ok(()))
1312 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1313 .count()
1314 > params.pagination.count as usize;
1315
1316 let page_info = if let (true, Some(version)) = (has_more, last_version) {
1318 let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
1319 PageInfo::with_next(cursor)
1320 } else {
1321 PageInfo::end()
1322 };
1323
1324 Ok(Page::new(entries, page_info))
1325 }
1326
1327 async fn history_instance_count(
1328 &self,
1329 tenant: &TenantContext,
1330 resource_type: &str,
1331 id: &str,
1332 ) -> StorageResult<u64> {
1333 let conn = self.get_connection()?;
1334 let tenant_id = tenant.tenant_id().as_str();
1335
1336 let count: i64 = conn
1337 .query_row(
1338 "SELECT COUNT(*) FROM resource_history
1339 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1340 params![tenant_id, resource_type, id],
1341 |row| row.get(0),
1342 )
1343 .map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
1344
1345 Ok(count as u64)
1346 }
1347
1348 async fn delete_instance_history(
1359 &self,
1360 tenant: &TenantContext,
1361 resource_type: &str,
1362 id: &str,
1363 ) -> StorageResult<u64> {
1364 let conn = self.get_connection()?;
1365 let tenant_id = tenant.tenant_id().as_str();
1366
1367 let exists: bool = conn
1369 .query_row(
1370 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1371 params![tenant_id, resource_type, id],
1372 |_| Ok(true),
1373 )
1374 .unwrap_or(false);
1375
1376 if !exists {
1377 return Err(StorageError::Resource(ResourceError::NotFound {
1378 resource_type: resource_type.to_string(),
1379 id: id.to_string(),
1380 }));
1381 }
1382
1383 let current_version: String = conn
1385 .query_row(
1386 "SELECT version_id FROM resources
1387 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1388 params![tenant_id, resource_type, id],
1389 |row| row.get(0),
1390 )
1391 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1392
1393 let deleted = conn
1396 .execute(
1397 "DELETE FROM resource_history
1398 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id != ?4",
1399 params![tenant_id, resource_type, id, current_version],
1400 )
1401 .map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
1402
1403 Ok(deleted as u64)
1404 }
1405
1406 async fn delete_version(
1412 &self,
1413 tenant: &TenantContext,
1414 resource_type: &str,
1415 id: &str,
1416 version_id: &str,
1417 ) -> StorageResult<()> {
1418 let conn = self.get_connection()?;
1419 let tenant_id = tenant.tenant_id().as_str();
1420
1421 let current_version: Result<String, _> = conn.query_row(
1423 "SELECT version_id FROM resources
1424 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1425 params![tenant_id, resource_type, id],
1426 |row| row.get(0),
1427 );
1428
1429 let current_version = match current_version {
1430 Ok(v) => v,
1431 Err(rusqlite::Error::QueryReturnedNoRows) => {
1432 return Err(StorageError::Resource(ResourceError::NotFound {
1433 resource_type: resource_type.to_string(),
1434 id: id.to_string(),
1435 }));
1436 }
1437 Err(e) => {
1438 return Err(internal_error(format!(
1439 "Failed to get current version: {}",
1440 e
1441 )));
1442 }
1443 };
1444
1445 if version_id == current_version {
1447 return Err(StorageError::Validation(
1448 crate::error::ValidationError::InvalidResource {
1449 message: format!(
1450 "Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
1451 version_id, resource_type, id
1452 ),
1453 details: vec![],
1454 },
1455 ));
1456 }
1457
1458 let version_exists: bool = conn
1460 .query_row(
1461 "SELECT 1 FROM resource_history
1462 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
1463 params![tenant_id, resource_type, id, version_id],
1464 |_| Ok(true),
1465 )
1466 .unwrap_or(false);
1467
1468 if !version_exists {
1469 return Err(StorageError::Resource(ResourceError::VersionNotFound {
1470 resource_type: resource_type.to_string(),
1471 id: id.to_string(),
1472 version_id: version_id.to_string(),
1473 }));
1474 }
1475
1476 conn.execute(
1478 "DELETE FROM resource_history
1479 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
1480 params![tenant_id, resource_type, id, version_id],
1481 )
1482 .map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
1483
1484 Ok(())
1485 }
1486}
1487
1488#[async_trait]
1489impl TypeHistoryProvider for SqliteBackend {
1490 async fn history_type(
1491 &self,
1492 tenant: &TenantContext,
1493 resource_type: &str,
1494 params: &HistoryParams,
1495 ) -> StorageResult<HistoryPage> {
1496 let conn = self.get_connection()?;
1497 let tenant_id = tenant.tenant_id().as_str();
1498
1499 let mut sql = String::from(
1501 "SELECT id, version_id, data, last_updated, is_deleted, fhir_version
1502 FROM resource_history
1503 WHERE tenant_id = ?1 AND resource_type = ?2",
1504 );
1505
1506 if !params.include_deleted {
1508 sql.push_str(" AND is_deleted = 0");
1509 }
1510
1511 if let Some(since) = ¶ms.since {
1513 sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
1514 }
1515
1516 if let Some(before) = ¶ms.before {
1518 sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
1519 }
1520
1521 if let Some(cursor) = params.pagination.cursor_value() {
1524 let sort_values = cursor.sort_values();
1525 if sort_values.len() >= 2 {
1526 if let (
1527 Some(CursorValue::String(timestamp)),
1528 Some(CursorValue::String(resource_id)),
1529 ) = (sort_values.first(), sort_values.get(1))
1530 {
1531 sql.push_str(&format!(
1533 " AND (last_updated < '{}' OR (last_updated = '{}' AND id < '{}'))",
1534 timestamp, timestamp, resource_id
1535 ));
1536 }
1537 }
1538 }
1539
1540 sql.push_str(" ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC");
1542 sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1)); let mut stmt = conn
1545 .prepare(&sql)
1546 .map_err(|e| internal_error(format!("Failed to prepare type history query: {}", e)))?;
1547
1548 let rows = stmt
1549 .query_map(params![tenant_id, resource_type], |row| {
1550 let id: String = row.get(0)?;
1551 let version_id: String = row.get(1)?;
1552 let data: Vec<u8> = row.get(2)?;
1553 let last_updated: String = row.get(3)?;
1554 let is_deleted: i32 = row.get(4)?;
1555 let fhir_version: String = row.get(5)?;
1556 Ok((id, version_id, data, last_updated, is_deleted, fhir_version))
1557 })
1558 .map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
1559
1560 let mut entries = Vec::new();
1561 let mut last_entry: Option<(String, String)> = None; for row in rows {
1564 let (id, version_id, data, last_updated_str, is_deleted, fhir_version_str) =
1565 row.map_err(|e| internal_error(format!("Failed to read type history row: {}", e)))?;
1566
1567 if entries.len() >= params.pagination.count as usize {
1569 break;
1570 }
1571
1572 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1573 serialization_error(format!("Failed to deserialize resource: {}", e))
1574 })?;
1575
1576 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1577 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1578 .with_timezone(&Utc);
1579
1580 let deleted_at = if is_deleted != 0 {
1581 Some(last_updated)
1582 } else {
1583 None
1584 };
1585
1586 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1587 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1588
1589 let resource = StoredResource::from_storage(
1590 resource_type,
1591 &id,
1592 &version_id,
1593 tenant.tenant_id().clone(),
1594 json_data,
1595 last_updated,
1596 last_updated,
1597 deleted_at,
1598 fhir_version,
1599 );
1600
1601 let method = if is_deleted != 0 {
1603 HistoryMethod::Delete
1604 } else if version_id == "1" {
1605 HistoryMethod::Post
1606 } else {
1607 HistoryMethod::Put
1608 };
1609
1610 last_entry = Some((last_updated_str.clone(), id));
1611
1612 entries.push(HistoryEntry {
1613 resource,
1614 method,
1615 timestamp: last_updated,
1616 });
1617 }
1618
1619 let _total_fetched = entries.len();
1621 let has_more = {
1622 let check_sql = sql.replace(
1624 &format!(" LIMIT {}", params.pagination.count + 1),
1625 &format!(" LIMIT {}", params.pagination.count + 2),
1626 );
1627 let mut check_stmt = conn
1628 .prepare(&check_sql)
1629 .map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
1630 let check_count = check_stmt
1631 .query_map(params![tenant_id, resource_type], |_| Ok(()))
1632 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1633 .count();
1634 check_count > params.pagination.count as usize
1635 };
1636
1637 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1639 let cursor = PageCursor::new(
1640 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1641 resource_type.to_string(),
1642 );
1643 PageInfo::with_next(cursor)
1644 } else {
1645 PageInfo::end()
1646 };
1647
1648 Ok(Page::new(entries, page_info))
1649 }
1650
1651 async fn history_type_count(
1652 &self,
1653 tenant: &TenantContext,
1654 resource_type: &str,
1655 ) -> StorageResult<u64> {
1656 let conn = self.get_connection()?;
1657 let tenant_id = tenant.tenant_id().as_str();
1658
1659 let count: i64 = conn
1660 .query_row(
1661 "SELECT COUNT(*) FROM resource_history
1662 WHERE tenant_id = ?1 AND resource_type = ?2",
1663 params![tenant_id, resource_type],
1664 |row| row.get(0),
1665 )
1666 .map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
1667
1668 Ok(count as u64)
1669 }
1670}
1671
1672#[async_trait]
1673impl SystemHistoryProvider for SqliteBackend {
1674 async fn history_system(
1675 &self,
1676 tenant: &TenantContext,
1677 params: &HistoryParams,
1678 ) -> StorageResult<HistoryPage> {
1679 let conn = self.get_connection()?;
1680 let tenant_id = tenant.tenant_id().as_str();
1681
1682 let mut sql = String::from(
1684 "SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
1685 FROM resource_history
1686 WHERE tenant_id = ?1",
1687 );
1688
1689 if !params.include_deleted {
1691 sql.push_str(" AND is_deleted = 0");
1692 }
1693
1694 if let Some(since) = ¶ms.since {
1696 sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
1697 }
1698
1699 if let Some(before) = ¶ms.before {
1701 sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
1702 }
1703
1704 if let Some(cursor) = params.pagination.cursor_value() {
1707 let sort_values = cursor.sort_values();
1708 if sort_values.len() >= 3 {
1709 if let (
1710 Some(CursorValue::String(timestamp)),
1711 Some(CursorValue::String(res_type)),
1712 Some(CursorValue::String(res_id)),
1713 ) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
1714 {
1715 sql.push_str(&format!(
1717 " AND (last_updated < '{}' OR (last_updated = '{}' AND (resource_type < '{}' OR (resource_type = '{}' AND id < '{}'))))",
1718 timestamp, timestamp, res_type, res_type, res_id
1719 ));
1720 }
1721 }
1722 }
1723
1724 sql.push_str(" ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC");
1726 sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1)); let mut stmt = conn.prepare(&sql).map_err(|e| {
1729 internal_error(format!("Failed to prepare system history query: {}", e))
1730 })?;
1731
1732 let rows = stmt
1733 .query_map(params![tenant_id], |row| {
1734 let resource_type: String = row.get(0)?;
1735 let id: String = row.get(1)?;
1736 let version_id: String = row.get(2)?;
1737 let data: Vec<u8> = row.get(3)?;
1738 let last_updated: String = row.get(4)?;
1739 let is_deleted: i32 = row.get(5)?;
1740 let fhir_version: String = row.get(6)?;
1741 Ok((
1742 resource_type,
1743 id,
1744 version_id,
1745 data,
1746 last_updated,
1747 is_deleted,
1748 fhir_version,
1749 ))
1750 })
1751 .map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
1752
1753 let mut entries = Vec::new();
1754 let mut last_entry: Option<(String, String, String)> = None; for row in rows {
1757 let (
1758 resource_type,
1759 id,
1760 version_id,
1761 data,
1762 last_updated_str,
1763 is_deleted,
1764 fhir_version_str,
1765 ) = row
1766 .map_err(|e| internal_error(format!("Failed to read system history row: {}", e)))?;
1767
1768 if entries.len() >= params.pagination.count as usize {
1770 break;
1771 }
1772
1773 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1774 serialization_error(format!("Failed to deserialize resource: {}", e))
1775 })?;
1776
1777 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1778 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1779 .with_timezone(&Utc);
1780
1781 let deleted_at = if is_deleted != 0 {
1782 Some(last_updated)
1783 } else {
1784 None
1785 };
1786
1787 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1788 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1789
1790 let resource = StoredResource::from_storage(
1791 &resource_type,
1792 &id,
1793 &version_id,
1794 tenant.tenant_id().clone(),
1795 json_data,
1796 last_updated,
1797 last_updated,
1798 deleted_at,
1799 fhir_version,
1800 );
1801
1802 let method = if is_deleted != 0 {
1804 HistoryMethod::Delete
1805 } else if version_id == "1" {
1806 HistoryMethod::Post
1807 } else {
1808 HistoryMethod::Put
1809 };
1810
1811 last_entry = Some((last_updated_str.clone(), resource_type, id));
1812
1813 entries.push(HistoryEntry {
1814 resource,
1815 method,
1816 timestamp: last_updated,
1817 });
1818 }
1819
1820 let has_more = {
1822 let check_sql = sql.replace(
1823 &format!(" LIMIT {}", params.pagination.count + 1),
1824 &format!(" LIMIT {}", params.pagination.count + 2),
1825 );
1826 let mut check_stmt = conn
1827 .prepare(&check_sql)
1828 .map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
1829 let check_count = check_stmt
1830 .query_map(params![tenant_id], |_| Ok(()))
1831 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1832 .count();
1833 check_count > params.pagination.count as usize
1834 };
1835
1836 let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
1838 {
1839 let cursor = PageCursor::new(
1840 vec![
1841 CursorValue::String(timestamp),
1842 CursorValue::String(resource_type),
1843 CursorValue::String(id),
1844 ],
1845 "system".to_string(),
1846 );
1847 PageInfo::with_next(cursor)
1848 } else {
1849 PageInfo::end()
1850 };
1851
1852 Ok(Page::new(entries, page_info))
1853 }
1854
1855 async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1856 let conn = self.get_connection()?;
1857 let tenant_id = tenant.tenant_id().as_str();
1858
1859 let count: i64 = conn
1860 .query_row(
1861 "SELECT COUNT(*) FROM resource_history WHERE tenant_id = ?1",
1862 params![tenant_id],
1863 |row| row.get(0),
1864 )
1865 .map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
1866
1867 Ok(count as u64)
1868 }
1869}
1870
1871#[async_trait]
1872impl PurgableStorage for SqliteBackend {
1873 async fn purge(
1874 &self,
1875 tenant: &TenantContext,
1876 resource_type: &str,
1877 id: &str,
1878 ) -> StorageResult<()> {
1879 let conn = self.get_connection()?;
1880 let tenant_id = tenant.tenant_id().as_str();
1881
1882 let exists: bool = conn
1884 .query_row(
1885 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1886 params![tenant_id, resource_type, id],
1887 |_| Ok(true),
1888 )
1889 .unwrap_or(false);
1890
1891 if !exists {
1892 let history_exists: bool = conn
1894 .query_row(
1895 "SELECT 1 FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1896 params![tenant_id, resource_type, id],
1897 |_| Ok(true),
1898 )
1899 .unwrap_or(false);
1900
1901 if !history_exists {
1902 return Err(StorageError::Resource(ResourceError::NotFound {
1903 resource_type: resource_type.to_string(),
1904 id: id.to_string(),
1905 }));
1906 }
1907 }
1908
1909 conn.execute(
1911 "DELETE FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1912 params![tenant_id, resource_type, id],
1913 )
1914 .map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
1915
1916 conn.execute(
1918 "DELETE FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1919 params![tenant_id, resource_type, id],
1920 )
1921 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1922
1923 conn.execute(
1925 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
1926 params![tenant_id, resource_type, id],
1927 )
1928 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1929
1930 Ok(())
1931 }
1932
1933 async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
1934 let conn = self.get_connection()?;
1935 let tenant_id = tenant.tenant_id().as_str();
1936
1937 let count: i64 = conn
1939 .query_row(
1940 "SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2",
1941 params![tenant_id, resource_type],
1942 |row| row.get(0),
1943 )
1944 .unwrap_or(0);
1945
1946 conn.execute(
1948 "DELETE FROM resources WHERE tenant_id = ?1 AND resource_type = ?2",
1949 params![tenant_id, resource_type],
1950 )
1951 .map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
1952
1953 conn.execute(
1955 "DELETE FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2",
1956 params![tenant_id, resource_type],
1957 )
1958 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1959
1960 conn.execute(
1962 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2",
1963 params![tenant_id, resource_type],
1964 )
1965 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1966
1967 Ok(count as u64)
1968 }
1969}
1970
1971#[async_trait]
1972impl DifferentialHistoryProvider for SqliteBackend {
1973 async fn modified_since(
1974 &self,
1975 tenant: &TenantContext,
1976 resource_type: Option<&str>,
1977 since: chrono::DateTime<Utc>,
1978 pagination: &Pagination,
1979 ) -> StorageResult<Page<StoredResource>> {
1980 let conn = self.get_connection()?;
1981 let tenant_id = tenant.tenant_id().as_str();
1982 let since_str = since.to_rfc3339();
1983
1984 let mut sql = String::from(
1986 "SELECT resource_type, id, version_id, data, last_updated, fhir_version
1987 FROM resources
1988 WHERE tenant_id = ?1 AND last_updated > ?2 AND is_deleted = 0",
1989 );
1990
1991 if let Some(rt) = resource_type {
1993 sql.push_str(&format!(" AND resource_type = '{}'", rt));
1994 }
1995
1996 if let Some(cursor) = pagination.cursor_value() {
1998 let sort_values = cursor.sort_values();
1999 if sort_values.len() >= 2 {
2000 if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
2001 (sort_values.first(), sort_values.get(1))
2002 {
2003 sql.push_str(&format!(
2004 " AND (last_updated > '{}' OR (last_updated = '{}' AND id > '{}'))",
2005 timestamp, timestamp, res_id
2006 ));
2007 }
2008 }
2009 }
2010
2011 sql.push_str(" ORDER BY last_updated ASC, id ASC");
2013 sql.push_str(&format!(" LIMIT {}", pagination.count + 1));
2014
2015 let mut stmt = conn.prepare(&sql).map_err(|e| {
2016 internal_error(format!("Failed to prepare modified_since query: {}", e))
2017 })?;
2018
2019 let rows = stmt
2020 .query_map(params![tenant_id, since_str], |row| {
2021 let resource_type: String = row.get(0)?;
2022 let id: String = row.get(1)?;
2023 let version_id: String = row.get(2)?;
2024 let data: Vec<u8> = row.get(3)?;
2025 let last_updated: String = row.get(4)?;
2026 let fhir_version: String = row.get(5)?;
2027 Ok((
2028 resource_type,
2029 id,
2030 version_id,
2031 data,
2032 last_updated,
2033 fhir_version,
2034 ))
2035 })
2036 .map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
2037
2038 let mut resources = Vec::new();
2039 let mut last_entry: Option<(String, String)> = None; for row in rows {
2042 let (resource_type, id, version_id, data, last_updated_str, fhir_version_str) =
2043 row.map_err(|e| internal_error(format!("Failed to read row: {}", e)))?;
2044
2045 if resources.len() >= pagination.count as usize {
2047 break;
2048 }
2049
2050 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
2051 serialization_error(format!("Failed to deserialize resource: {}", e))
2052 })?;
2053
2054 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
2055 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
2056 .with_timezone(&Utc);
2057
2058 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
2059 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
2060
2061 let resource = StoredResource::from_storage(
2062 &resource_type,
2063 &id,
2064 &version_id,
2065 tenant.tenant_id().clone(),
2066 json_data,
2067 last_updated,
2068 last_updated,
2069 None,
2070 fhir_version,
2071 );
2072
2073 last_entry = Some((last_updated_str, id));
2074 resources.push(resource);
2075 }
2076
2077 let has_more = {
2079 let check_sql = sql.replace(
2080 &format!(" LIMIT {}", pagination.count + 1),
2081 &format!(" LIMIT {}", pagination.count + 2),
2082 );
2083 let mut check_stmt = conn
2084 .prepare(&check_sql)
2085 .map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
2086 let check_count = check_stmt
2087 .query_map(params![tenant_id, since_str], |_| Ok(()))
2088 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
2089 .count();
2090 check_count > pagination.count as usize
2091 };
2092
2093 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
2095 let cursor = PageCursor::new(
2096 vec![CursorValue::String(timestamp), CursorValue::String(id)],
2097 "modified_since".to_string(),
2098 );
2099 PageInfo::with_next(cursor)
2100 } else {
2101 PageInfo::end()
2102 };
2103
2104 Ok(Page::new(resources, page_info))
2105 }
2106}
2107
2108fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
2111 params
2112 .split('&')
2113 .filter_map(|pair| {
2114 let parts: Vec<&str> = pair.splitn(2, '=').collect();
2115 if parts.len() == 2 {
2116 Some((parts[0].to_string(), parts[1].to_string()))
2117 } else {
2118 None
2119 }
2120 })
2121 .collect()
2122}
2123
2124#[async_trait]
2125impl ConditionalStorage for SqliteBackend {
2126 async fn conditional_create(
2127 &self,
2128 tenant: &TenantContext,
2129 resource_type: &str,
2130 resource: Value,
2131 search_params: &str,
2132 fhir_version: FhirVersion,
2133 ) -> StorageResult<ConditionalCreateResult> {
2134 let matches = self
2136 .find_matching_resources(tenant, resource_type, search_params)
2137 .await?;
2138
2139 match matches.len() {
2140 0 => {
2141 let created = self
2143 .create(tenant, resource_type, resource, fhir_version)
2144 .await?;
2145 Ok(ConditionalCreateResult::Created(created))
2146 }
2147 1 => {
2148 Ok(ConditionalCreateResult::Exists(
2150 matches.into_iter().next().unwrap(),
2151 ))
2152 }
2153 n => {
2154 Ok(ConditionalCreateResult::MultipleMatches(n))
2156 }
2157 }
2158 }
2159
2160 async fn conditional_update(
2161 &self,
2162 tenant: &TenantContext,
2163 resource_type: &str,
2164 resource: Value,
2165 search_params: &str,
2166 upsert: bool,
2167 fhir_version: FhirVersion,
2168 ) -> StorageResult<ConditionalUpdateResult> {
2169 let matches = self
2171 .find_matching_resources(tenant, resource_type, search_params)
2172 .await?;
2173
2174 match matches.len() {
2175 0 => {
2176 if upsert {
2177 let created = self
2179 .create(tenant, resource_type, resource, fhir_version)
2180 .await?;
2181 Ok(ConditionalUpdateResult::Created(created))
2182 } else {
2183 Ok(ConditionalUpdateResult::NoMatch)
2185 }
2186 }
2187 1 => {
2188 let existing = matches.into_iter().next().unwrap();
2190 let updated = self.update(tenant, &existing, resource).await?;
2191 Ok(ConditionalUpdateResult::Updated(updated))
2192 }
2193 n => {
2194 Ok(ConditionalUpdateResult::MultipleMatches(n))
2196 }
2197 }
2198 }
2199
2200 async fn conditional_delete(
2201 &self,
2202 tenant: &TenantContext,
2203 resource_type: &str,
2204 search_params: &str,
2205 ) -> StorageResult<ConditionalDeleteResult> {
2206 let matches = self
2208 .find_matching_resources(tenant, resource_type, search_params)
2209 .await?;
2210
2211 match matches.len() {
2212 0 => {
2213 Ok(ConditionalDeleteResult::NoMatch)
2215 }
2216 1 => {
2217 let existing = matches.into_iter().next().unwrap();
2219 self.delete(tenant, resource_type, existing.id()).await?;
2220 Ok(ConditionalDeleteResult::Deleted)
2221 }
2222 n => {
2223 Ok(ConditionalDeleteResult::MultipleMatches(n))
2225 }
2226 }
2227 }
2228
2229 async fn conditional_patch(
2239 &self,
2240 tenant: &TenantContext,
2241 resource_type: &str,
2242 search_params: &str,
2243 patch: &crate::core::PatchFormat,
2244 ) -> StorageResult<crate::core::ConditionalPatchResult> {
2245 use crate::core::{ConditionalPatchResult, PatchFormat};
2246
2247 let matches = self
2249 .find_matching_resources(tenant, resource_type, search_params)
2250 .await?;
2251
2252 match matches.len() {
2253 0 => Ok(ConditionalPatchResult::NoMatch),
2254 1 => {
2255 let existing = matches.into_iter().next().unwrap();
2257 let current_content = existing.content().clone();
2258
2259 let patched_content = match patch {
2261 PatchFormat::JsonPatch(patch_doc) => {
2262 self.apply_json_patch(¤t_content, patch_doc)?
2263 }
2264 PatchFormat::FhirPathPatch(patch_params) => {
2265 self.apply_fhirpath_patch(¤t_content, patch_params)?
2266 }
2267 PatchFormat::MergePatch(merge_doc) => {
2268 self.apply_merge_patch(¤t_content, merge_doc)
2269 }
2270 };
2271
2272 let updated = self.update(tenant, &existing, patched_content).await?;
2274 Ok(ConditionalPatchResult::Patched(updated))
2275 }
2276 n => Ok(ConditionalPatchResult::MultipleMatches(n)),
2277 }
2278 }
2279}
2280
2281impl SqliteBackend {
2282 async fn find_matching_resources(
2287 &self,
2288 tenant: &TenantContext,
2289 resource_type: &str,
2290 search_params_str: &str,
2291 ) -> StorageResult<Vec<StoredResource>> {
2292 let parsed_params = parse_simple_search_params(search_params_str);
2294
2295 if parsed_params.is_empty() {
2296 return Ok(Vec::new());
2299 }
2300
2301 let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
2303
2304 let query = SearchQuery {
2306 resource_type: resource_type.to_string(),
2307 parameters: search_params,
2308 count: Some(1000), ..Default::default()
2311 };
2312
2313 let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
2315
2316 Ok(result.resources.items)
2317 }
2318
2319 fn build_search_parameters(
2324 &self,
2325 resource_type: &str,
2326 params: &[(String, String)],
2327 ) -> StorageResult<Vec<SearchParameter>> {
2328 let registry = self.search_registry().read();
2329 let mut search_params = Vec::with_capacity(params.len());
2330
2331 for (name, value) in params {
2332 let param_type = self
2334 .lookup_param_type(®istry, resource_type, name)
2335 .unwrap_or({
2336 match name.as_str() {
2338 "_id" => SearchParamType::Token,
2339 "_lastUpdated" => SearchParamType::Date,
2340 "_tag" | "_profile" | "_security" => SearchParamType::Token,
2341 "identifier" => SearchParamType::Token,
2342 "patient" | "subject" | "encounter" | "performer" | "author"
2344 | "requester" | "recorder" | "asserter" | "practitioner"
2345 | "organization" | "location" | "device" => SearchParamType::Reference,
2346 _ => SearchParamType::String, }
2348 });
2349
2350 search_params.push(SearchParameter {
2351 name: name.clone(),
2352 param_type,
2353 modifier: None,
2354 values: vec![SearchValue::parse(value)],
2355 chain: vec![],
2356 components: vec![],
2357 });
2358 }
2359
2360 Ok(search_params)
2361 }
2362
2363 fn lookup_param_type(
2367 &self,
2368 registry: &crate::search::SearchParameterRegistry,
2369 resource_type: &str,
2370 param_name: &str,
2371 ) -> Option<SearchParamType> {
2372 if let Some(def) = registry.get_param(resource_type, param_name) {
2374 return Some(def.param_type);
2375 }
2376
2377 if let Some(def) = registry.get_param("Resource", param_name) {
2379 return Some(def.param_type);
2380 }
2381
2382 None
2383 }
2384
2385 fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
2399 use crate::error::ValidationError;
2400
2401 let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
2403 StorageError::Validation(ValidationError::InvalidResource {
2404 message: format!("Invalid JSON Patch document: {}", e),
2405 details: vec![],
2406 })
2407 })?;
2408
2409 let mut patched = resource.clone();
2411 json_patch::patch(&mut patched, &patch).map_err(|e| {
2412 StorageError::Validation(ValidationError::InvalidResource {
2413 message: format!("Failed to apply JSON Patch: {}", e),
2414 details: vec![],
2415 })
2416 })?;
2417
2418 Ok(patched)
2419 }
2420
2421 fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
2432 use crate::error::ValidationError;
2433
2434 let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
2436 if parameter.is_none() {
2437 return Err(StorageError::Validation(ValidationError::InvalidResource {
2438 message: "FHIRPath Patch must have a 'parameter' array".to_string(),
2439 details: vec![],
2440 }));
2441 }
2442
2443 let mut patched = resource.clone();
2444
2445 for operation in parameter.unwrap() {
2446 let parts = operation.get("part").and_then(|p| p.as_array());
2448 if parts.is_none() {
2449 continue;
2450 }
2451
2452 let mut op_type = None;
2453 let mut op_path = None;
2454 let mut op_name = None;
2455 let mut op_value = None;
2456
2457 for part in parts.unwrap() {
2458 match part.get("name").and_then(|n| n.as_str()) {
2459 Some("type") => {
2460 op_type = part
2461 .get("valueCode")
2462 .and_then(|v| v.as_str())
2463 .map(|s| s.to_string());
2464 }
2465 Some("path") => {
2466 op_path = part
2467 .get("valueString")
2468 .and_then(|v| v.as_str())
2469 .map(|s| s.to_string());
2470 }
2471 Some("name") => {
2472 op_name = part
2473 .get("valueString")
2474 .and_then(|v| v.as_str())
2475 .map(|s| s.to_string());
2476 }
2477 Some("value") => {
2478 op_value = extract_part_value(part);
2479 }
2480 _ => {}
2481 }
2482 }
2483
2484 match op_type.as_deref() {
2486 Some("replace") => {
2487 if let (Some(path), Some(value)) = (&op_path, &op_value) {
2488 self.fhirpath_replace(&mut patched, path, value)?;
2489 }
2490 }
2491 Some("add") => {
2492 if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
2493 self.fhirpath_add(&mut patched, path, name, value)?;
2494 }
2495 }
2496 Some("delete") => {
2497 if let Some(path) = &op_path {
2498 self.fhirpath_delete(&mut patched, path)?;
2499 }
2500 }
2501 _ => {
2502 }
2504 }
2505 }
2506
2507 Ok(patched)
2508 }
2509
2510 fn fhirpath_replace(
2512 &self,
2513 resource: &mut Value,
2514 path: &str,
2515 value: &Value,
2516 ) -> StorageResult<()> {
2517 let parts: Vec<&str> = path.split('.').collect();
2520 if parts.len() == 2 {
2521 if let Some(obj) = resource.as_object_mut() {
2523 obj.insert(parts[1].to_string(), value.clone());
2524 }
2525 }
2526 Ok(())
2527 }
2528
2529 fn fhirpath_add(
2531 &self,
2532 resource: &mut Value,
2533 path: &str,
2534 name: &str,
2535 value: &Value,
2536 ) -> StorageResult<()> {
2537 let parts: Vec<&str> = path.split('.').collect();
2539 if parts.len() == 1
2540 && parts[0]
2541 == resource
2542 .get("resourceType")
2543 .and_then(|r| r.as_str())
2544 .unwrap_or("")
2545 {
2546 if let Some(obj) = resource.as_object_mut() {
2548 obj.insert(name.to_string(), value.clone());
2549 }
2550 }
2551 Ok(())
2552 }
2553
2554 fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
2556 let parts: Vec<&str> = path.split('.').collect();
2558 if parts.len() == 2 {
2559 if let Some(obj) = resource.as_object_mut() {
2560 obj.remove(parts[1]);
2561 }
2562 }
2563 Ok(())
2564 }
2565
2566 fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
2573 let mut patched = resource.clone();
2574 json_patch::merge(&mut patched, merge_doc);
2575 patched
2576 }
2577}
2578
2579#[async_trait]
2580impl BundleProvider for SqliteBackend {
2581 async fn process_transaction(
2582 &self,
2583 tenant: &TenantContext,
2584 entries: Vec<BundleEntry>,
2585 ) -> Result<BundleResult, TransactionError> {
2586 use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
2587 use std::collections::HashMap;
2588
2589 let mut tx = self
2591 .begin_transaction(tenant, TransactionOptions::new())
2592 .await
2593 .map_err(|e| TransactionError::RolledBack {
2594 reason: format!("Failed to begin transaction: {}", e),
2595 })?;
2596
2597 let mut results = Vec::with_capacity(entries.len());
2598 let mut error_info: Option<(usize, String)> = None;
2599
2600 let mut reference_map: HashMap<String, String> = HashMap::new();
2603
2604 let mut entries = entries;
2606
2607 for (idx, entry) in entries.iter_mut().enumerate() {
2609 if let Some(ref mut resource) = entry.resource {
2611 resolve_bundle_references(resource, &reference_map);
2612 }
2613
2614 let result = self.process_bundle_entry_tx(&mut tx, entry).await;
2615
2616 match result {
2617 Ok(entry_result) => {
2618 if entry_result.status >= 400 {
2620 error_info = Some((
2621 idx,
2622 format!("Entry failed with status {}", entry_result.status),
2623 ));
2624 break;
2625 }
2626
2627 if entry.method == BundleMethod::Post {
2629 if let Some(ref full_url) = entry.full_url {
2630 if let Some(ref location) = entry_result.location {
2631 let reference = location
2634 .split("/_history")
2635 .next()
2636 .unwrap_or(location)
2637 .to_string();
2638 reference_map.insert(full_url.clone(), reference);
2639 }
2640 }
2641 }
2642
2643 results.push(entry_result);
2644 }
2645 Err(e) => {
2646 error_info = Some((idx, format!("Entry processing failed: {}", e)));
2647 break;
2648 }
2649 }
2650 }
2651
2652 if let Some((index, message)) = error_info {
2654 let _ = Box::new(tx).rollback().await;
2655 return Err(TransactionError::BundleError { index, message });
2656 }
2657
2658 Box::new(tx)
2660 .commit()
2661 .await
2662 .map_err(|e| TransactionError::RolledBack {
2663 reason: format!("Commit failed: {}", e),
2664 })?;
2665
2666 Ok(BundleResult {
2667 bundle_type: BundleType::Transaction,
2668 entries: results,
2669 })
2670 }
2671
2672 async fn process_batch(
2673 &self,
2674 tenant: &TenantContext,
2675 entries: Vec<BundleEntry>,
2676 ) -> StorageResult<BundleResult> {
2677 let mut results = Vec::with_capacity(entries.len());
2678
2679 for entry in &entries {
2681 let result = self.process_batch_entry(tenant, entry).await;
2682 results.push(result);
2683 }
2684
2685 Ok(BundleResult {
2686 bundle_type: BundleType::Batch,
2687 entries: results,
2688 })
2689 }
2690}
2691
2692impl SqliteBackend {
2693 async fn process_bundle_entry_tx(
2695 &self,
2696 tx: &mut crate::backends::sqlite::transaction::SqliteTransaction,
2697 entry: &BundleEntry,
2698 ) -> StorageResult<BundleEntryResult> {
2699 use crate::core::transaction::Transaction;
2700
2701 match entry.method {
2702 BundleMethod::Get => {
2703 let (resource_type, id) = self.parse_url(&entry.url)?;
2705 match tx.read(&resource_type, &id).await? {
2706 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2707 None => Ok(BundleEntryResult::error(
2708 404,
2709 serde_json::json!({
2710 "resourceType": "OperationOutcome",
2711 "issue": [{"severity": "error", "code": "not-found"}]
2712 }),
2713 )),
2714 }
2715 }
2716 BundleMethod::Post => {
2717 let resource = entry.resource.clone().ok_or_else(|| {
2719 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2720 field: "resource".to_string(),
2721 })
2722 })?;
2723
2724 let resource_type = resource
2725 .get("resourceType")
2726 .and_then(|v| v.as_str())
2727 .map(|s| s.to_string())
2728 .ok_or_else(|| {
2729 StorageError::Validation(
2730 crate::error::ValidationError::MissingRequiredField {
2731 field: "resourceType".to_string(),
2732 },
2733 )
2734 })?;
2735
2736 let created = tx.create(&resource_type, resource).await?;
2737 Ok(BundleEntryResult::created(created))
2738 }
2739 BundleMethod::Put => {
2740 let resource = entry.resource.clone().ok_or_else(|| {
2742 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2743 field: "resource".to_string(),
2744 })
2745 })?;
2746
2747 let (resource_type, id) = self.parse_url(&entry.url)?;
2748
2749 match tx.read(&resource_type, &id).await? {
2751 Some(existing) => {
2752 if let Some(ref if_match) = entry.if_match {
2754 let current_etag = existing.etag();
2755 if current_etag != if_match.as_str() {
2756 return Ok(BundleEntryResult::error(
2757 412,
2758 serde_json::json!({
2759 "resourceType": "OperationOutcome",
2760 "issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
2761 }),
2762 ));
2763 }
2764 }
2765 let updated = tx.update(&existing, resource).await?;
2766 Ok(BundleEntryResult::ok(updated))
2767 }
2768 None => {
2769 let mut resource_with_id = resource;
2771 resource_with_id["id"] = serde_json::json!(id);
2772 let created = tx.create(&resource_type, resource_with_id).await?;
2773 Ok(BundleEntryResult::created(created))
2774 }
2775 }
2776 }
2777 BundleMethod::Delete => {
2778 let (resource_type, id) = self.parse_url(&entry.url)?;
2779 tx.delete(&resource_type, &id).await?;
2780 Ok(BundleEntryResult::deleted())
2781 }
2782 BundleMethod::Patch => {
2783 Ok(BundleEntryResult::error(
2785 501,
2786 serde_json::json!({
2787 "resourceType": "OperationOutcome",
2788 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2789 }),
2790 ))
2791 }
2792 }
2793 }
2794
2795 async fn process_batch_entry(
2797 &self,
2798 tenant: &TenantContext,
2799 entry: &BundleEntry,
2800 ) -> BundleEntryResult {
2801 match self.process_batch_entry_inner(tenant, entry).await {
2802 Ok(result) => result,
2803 Err(e) => BundleEntryResult::error(
2804 500,
2805 serde_json::json!({
2806 "resourceType": "OperationOutcome",
2807 "issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
2808 }),
2809 ),
2810 }
2811 }
2812
2813 async fn process_batch_entry_inner(
2814 &self,
2815 tenant: &TenantContext,
2816 entry: &BundleEntry,
2817 ) -> StorageResult<BundleEntryResult> {
2818 match entry.method {
2819 BundleMethod::Get => {
2820 let (resource_type, id) = self.parse_url(&entry.url)?;
2821 match self.read(tenant, &resource_type, &id).await? {
2822 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2823 None => Ok(BundleEntryResult::error(
2824 404,
2825 serde_json::json!({
2826 "resourceType": "OperationOutcome",
2827 "issue": [{"severity": "error", "code": "not-found"}]
2828 }),
2829 )),
2830 }
2831 }
2832 BundleMethod::Post => {
2833 let resource = entry.resource.clone().ok_or_else(|| {
2834 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2835 field: "resource".to_string(),
2836 })
2837 })?;
2838
2839 let resource_type = resource
2840 .get("resourceType")
2841 .and_then(|v| v.as_str())
2842 .map(|s| s.to_string())
2843 .ok_or_else(|| {
2844 StorageError::Validation(
2845 crate::error::ValidationError::MissingRequiredField {
2846 field: "resourceType".to_string(),
2847 },
2848 )
2849 })?;
2850
2851 let created = self
2853 .create(
2854 tenant,
2855 &resource_type,
2856 resource,
2857 FhirVersion::default_enabled(),
2858 )
2859 .await?;
2860 Ok(BundleEntryResult::created(created))
2861 }
2862 BundleMethod::Put => {
2863 let resource = entry.resource.clone().ok_or_else(|| {
2864 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2865 field: "resource".to_string(),
2866 })
2867 })?;
2868
2869 let (resource_type, id) = self.parse_url(&entry.url)?;
2870 let (stored, _created) = self
2872 .create_or_update(
2873 tenant,
2874 &resource_type,
2875 &id,
2876 resource,
2877 FhirVersion::default_enabled(),
2878 )
2879 .await?;
2880 Ok(BundleEntryResult::ok(stored))
2881 }
2882 BundleMethod::Delete => {
2883 let (resource_type, id) = self.parse_url(&entry.url)?;
2884 match self.delete(tenant, &resource_type, &id).await {
2885 Ok(()) => Ok(BundleEntryResult::deleted()),
2886 Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
2887 Ok(BundleEntryResult::deleted()) }
2889 Err(e) => Err(e),
2890 }
2891 }
2892 BundleMethod::Patch => Ok(BundleEntryResult::error(
2893 501,
2894 serde_json::json!({
2895 "resourceType": "OperationOutcome",
2896 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2897 }),
2898 )),
2899 }
2900 }
2901
2902 fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
2904 let path = url
2909 .strip_prefix("http://")
2910 .or_else(|| url.strip_prefix("https://"))
2911 .map(|s| {
2912 s.find('/').map(|i| &s[i..]).unwrap_or(s)
2914 })
2915 .unwrap_or(url);
2916
2917 let path = path.trim_start_matches('/');
2918 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
2919
2920 if parts.len() >= 2 {
2923 let len = parts.len();
2924 Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
2925 } else {
2926 Err(StorageError::Validation(
2927 crate::error::ValidationError::InvalidReference {
2928 reference: url.to_string(),
2929 message: "URL must be in format ResourceType/id".to_string(),
2930 },
2931 ))
2932 }
2933 }
2934}
2935
2936fn resolve_bundle_references(
2941 value: &mut serde_json::Value,
2942 reference_map: &std::collections::HashMap<String, String>,
2943) {
2944 use serde_json::Value;
2945 match value {
2946 Value::Object(map) => {
2947 if let Some(Value::String(ref_str)) = map.get("reference") {
2949 if ref_str.starts_with("urn:uuid:") {
2950 if let Some(resolved) = reference_map.get(ref_str) {
2951 map.insert("reference".to_string(), Value::String(resolved.clone()));
2952 }
2953 }
2954 }
2955 for v in map.values_mut() {
2957 resolve_bundle_references(v, reference_map);
2958 }
2959 }
2960 Value::Array(arr) => {
2961 for item in arr {
2962 resolve_bundle_references(item, reference_map);
2963 }
2964 }
2965 _ => {}
2966 }
2967}
2968
2969#[async_trait]
2971impl ReindexableStorage for SqliteBackend {
2972 async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
2973 let conn = self.get_connection()?;
2974 let tenant_id = tenant.tenant_id().as_str().to_string();
2975
2976 let mut stmt = conn
2977 .prepare(
2978 "SELECT DISTINCT resource_type FROM resources WHERE tenant_id = ?1 AND is_deleted = 0",
2979 )
2980 .map_err(|e| internal_error(format!("Failed to prepare statement: {}", e)))?;
2981
2982 let types: Vec<String> = stmt
2983 .query_map([&tenant_id], |row| row.get(0))
2984 .map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?
2985 .filter_map(|r| r.ok())
2986 .collect();
2987
2988 Ok(types)
2989 }
2990
2991 async fn count_resources(
2992 &self,
2993 tenant: &TenantContext,
2994 resource_type: &str,
2995 ) -> StorageResult<u64> {
2996 self.count(tenant, Some(resource_type)).await
2997 }
2998
2999 async fn fetch_resources_page(
3000 &self,
3001 tenant: &TenantContext,
3002 resource_type: &str,
3003 cursor: Option<&str>,
3004 limit: u32,
3005 ) -> StorageResult<ResourcePage> {
3006 let conn = self.get_connection()?;
3007 let tenant_id = tenant.tenant_id().as_str().to_string();
3008
3009 let (cursor_ts, cursor_id) = if let Some(c) = cursor {
3011 let parts: Vec<&str> = c.split('|').collect();
3012 if parts.len() == 2 {
3013 (Some(parts[0].to_string()), Some(parts[1].to_string()))
3014 } else {
3015 (None, None)
3016 }
3017 } else {
3018 (None, None)
3019 };
3020
3021 let (sql, params): (String, Vec<Box<dyn ToSql>>) =
3023 if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
3024 (
3025 "SELECT id, version_id, data, last_updated, fhir_version FROM resources \
3026 WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 \
3027 AND (last_updated > ?3 OR (last_updated = ?3 AND id > ?4)) \
3028 ORDER BY last_updated ASC, id ASC LIMIT ?5"
3029 .to_string(),
3030 vec![
3031 Box::new(tenant_id.clone()) as Box<dyn ToSql>,
3032 Box::new(resource_type.to_string()),
3033 Box::new(ts.clone()),
3034 Box::new(id.clone()),
3035 Box::new(limit as i64),
3036 ],
3037 )
3038 } else {
3039 (
3040 "SELECT id, version_id, data, last_updated, fhir_version FROM resources \
3041 WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 \
3042 ORDER BY last_updated ASC, id ASC LIMIT ?3"
3043 .to_string(),
3044 vec![
3045 Box::new(tenant_id.clone()) as Box<dyn ToSql>,
3046 Box::new(resource_type.to_string()),
3047 Box::new(limit as i64),
3048 ],
3049 )
3050 };
3051
3052 let mut stmt = conn
3053 .prepare(&sql)
3054 .map_err(|e| internal_error(format!("Failed to prepare statement: {}", e)))?;
3055
3056 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
3057
3058 let resources: Vec<StoredResource> = stmt
3059 .query_map(param_refs.as_slice(), |row| {
3060 let id: String = row.get(0)?;
3061 let version_id: String = row.get(1)?;
3062 let data: Vec<u8> = row.get(2)?;
3063 let last_updated: String = row.get(3)?;
3064 let fhir_version: String = row.get(4)?;
3065
3066 Ok((id, version_id, data, last_updated, fhir_version))
3067 })
3068 .map_err(|e| internal_error(format!("Failed to query resources: {}", e)))?
3069 .filter_map(|r| r.ok())
3070 .filter_map(|(id, version_id, data, last_updated, fhir_version_str)| {
3071 let content: Value = serde_json::from_slice(&data).ok()?;
3072 let last_modified = chrono::DateTime::parse_from_rfc3339(&last_updated)
3073 .ok()?
3074 .with_timezone(&Utc);
3075 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
3076 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
3077 Some(StoredResource::from_storage(
3078 resource_type.to_string(),
3079 id,
3080 version_id,
3081 tenant.tenant_id().clone(),
3082 content,
3083 last_modified, last_modified,
3085 None, fhir_version,
3087 ))
3088 })
3089 .collect();
3090
3091 let next_cursor = if resources.len() == limit as usize {
3093 resources
3094 .last()
3095 .map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
3096 } else {
3097 None
3098 };
3099
3100 Ok(ResourcePage {
3101 resources,
3102 next_cursor,
3103 })
3104 }
3105
3106 async fn delete_search_entries(
3107 &self,
3108 tenant: &TenantContext,
3109 resource_type: &str,
3110 resource_id: &str,
3111 ) -> StorageResult<()> {
3112 let conn = self.get_connection()?;
3113 self.delete_search_index(
3114 &conn,
3115 tenant.tenant_id().as_str(),
3116 resource_type,
3117 resource_id,
3118 )
3119 }
3120
3121 async fn write_search_entries(
3122 &self,
3123 tenant: &TenantContext,
3124 resource_type: &str,
3125 resource_id: &str,
3126 resource: &Value,
3127 ) -> StorageResult<usize> {
3128 let conn = self.get_connection()?;
3129
3130 let values = self
3132 .search_extractor()
3133 .extract(resource, resource_type)
3134 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
3135
3136 let mut count = 0;
3137 for value in values {
3138 self.write_index_entry(
3139 &conn,
3140 tenant.tenant_id().as_str(),
3141 resource_type,
3142 resource_id,
3143 &value,
3144 )?;
3145 count += 1;
3146 }
3147
3148 count += self.index_contained_resources(
3151 &conn,
3152 tenant.tenant_id().as_str(),
3153 resource_type,
3154 resource_id,
3155 resource,
3156 )?;
3157
3158 Ok(count)
3159 }
3160
3161 async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
3162 let conn = self.get_connection()?;
3163 let tenant_id = tenant.tenant_id().as_str();
3164
3165 let deleted = conn
3166 .execute(
3167 "DELETE FROM search_index WHERE tenant_id = ?1",
3168 params![tenant_id],
3169 )
3170 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
3171
3172 Ok(deleted as u64)
3173 }
3174}
3175
3176#[cfg(test)]
3177mod tests {
3178 use super::*;
3179 use crate::core::history::HistoryParams;
3180 use crate::tenant::{TenantId, TenantPermissions};
3181 use serde_json::json;
3182 use std::path::PathBuf;
3183
3184 use crate::backends::sqlite::SqliteBackendConfig;
3185
3186 fn create_test_backend() -> SqliteBackend {
3187 let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3190 .parent()
3191 .and_then(|p| p.parent())
3192 .map(|p| p.join("data"))
3193 .unwrap_or_else(|| PathBuf::from("data"));
3194
3195 let config = SqliteBackendConfig {
3196 data_dir: Some(data_dir),
3197 ..Default::default()
3198 };
3199 let backend = SqliteBackend::with_config(":memory:", config).unwrap();
3200 backend.init_schema().unwrap();
3201 backend
3202 }
3203
3204 fn create_test_tenant() -> TenantContext {
3205 TenantContext::new(
3206 TenantId::new("test-tenant"),
3207 TenantPermissions::full_access(),
3208 )
3209 }
3210
3211 #[tokio::test]
3212 async fn test_create_and_read() {
3213 let backend = create_test_backend();
3214 let tenant = create_test_tenant();
3215
3216 let resource = json!({
3217 "resourceType": "Patient",
3218 "name": [{"family": "Test", "given": ["User"]}]
3219 });
3220
3221 let created = backend
3223 .create(&tenant, "Patient", resource, FhirVersion::default())
3224 .await
3225 .unwrap();
3226 assert_eq!(created.resource_type(), "Patient");
3227 assert_eq!(created.version_id(), "1");
3228
3229 let read = backend
3231 .read(&tenant, "Patient", created.id())
3232 .await
3233 .unwrap();
3234 assert!(read.is_some());
3235 let read = read.unwrap();
3236 assert_eq!(read.version_id(), "1");
3237 }
3238
3239 #[tokio::test]
3240 async fn test_create_with_id() {
3241 let backend = create_test_backend();
3242 let tenant = create_test_tenant();
3243
3244 let resource = json!({
3245 "resourceType": "Patient",
3246 "id": "patient-123",
3247 "name": [{"family": "Test"}]
3248 });
3249
3250 let created = backend
3251 .create(&tenant, "Patient", resource, FhirVersion::default())
3252 .await
3253 .unwrap();
3254 assert_eq!(created.id(), "patient-123");
3255 }
3256
3257 #[tokio::test]
3258 async fn test_create_duplicate_fails() {
3259 let backend = create_test_backend();
3260 let tenant = create_test_tenant();
3261
3262 let resource = json!({"id": "patient-1"});
3263 backend
3264 .create(&tenant, "Patient", resource.clone(), FhirVersion::default())
3265 .await
3266 .unwrap();
3267
3268 let result = backend
3269 .create(&tenant, "Patient", resource, FhirVersion::default())
3270 .await;
3271 assert!(matches!(
3272 result,
3273 Err(StorageError::Resource(ResourceError::AlreadyExists { .. }))
3274 ));
3275 }
3276
3277 #[tokio::test]
3278 async fn test_read_nonexistent() {
3279 let backend = create_test_backend();
3280 let tenant = create_test_tenant();
3281
3282 let result = backend
3283 .read(&tenant, "Patient", "nonexistent")
3284 .await
3285 .unwrap();
3286 assert!(result.is_none());
3287 }
3288
3289 #[tokio::test]
3290 async fn test_update() {
3291 let backend = create_test_backend();
3292 let tenant = create_test_tenant();
3293
3294 let resource = json!({"name": [{"family": "Original"}]});
3296 let created = backend
3297 .create(&tenant, "Patient", resource, FhirVersion::default())
3298 .await
3299 .unwrap();
3300
3301 let updated_content = json!({"name": [{"family": "Updated"}]});
3303 let updated = backend
3304 .update(&tenant, &created, updated_content)
3305 .await
3306 .unwrap();
3307 assert_eq!(updated.version_id(), "2");
3308
3309 let read = backend
3311 .read(&tenant, "Patient", created.id())
3312 .await
3313 .unwrap()
3314 .unwrap();
3315 assert_eq!(read.content()["name"][0]["family"], "Updated");
3316 }
3317
3318 #[tokio::test]
3319 async fn test_update_version_conflict() {
3320 let backend = create_test_backend();
3321 let tenant = create_test_tenant();
3322
3323 let resource = json!({});
3325 let created = backend
3326 .create(&tenant, "Patient", resource, FhirVersion::default())
3327 .await
3328 .unwrap();
3329
3330 let _ = backend.update(&tenant, &created, json!({})).await.unwrap();
3332
3333 let result = backend.update(&tenant, &created, json!({})).await;
3335 assert!(matches!(
3336 result,
3337 Err(StorageError::Concurrency(
3338 ConcurrencyError::VersionConflict { .. }
3339 ))
3340 ));
3341 }
3342
3343 #[tokio::test]
3344 async fn test_delete() {
3345 let backend = create_test_backend();
3346 let tenant = create_test_tenant();
3347
3348 let resource = json!({});
3350 let created = backend
3351 .create(&tenant, "Patient", resource, FhirVersion::default())
3352 .await
3353 .unwrap();
3354
3355 backend
3357 .delete(&tenant, "Patient", created.id())
3358 .await
3359 .unwrap();
3360
3361 let result = backend.read(&tenant, "Patient", created.id()).await;
3363 assert!(matches!(
3364 result,
3365 Err(StorageError::Resource(ResourceError::Gone { .. }))
3366 ));
3367 }
3368
3369 #[tokio::test]
3370 async fn test_create_or_update_new() {
3371 let backend = create_test_backend();
3372 let tenant = create_test_tenant();
3373
3374 let (resource, created) = backend
3375 .create_or_update(
3376 &tenant,
3377 "Patient",
3378 "new-id",
3379 json!({}),
3380 FhirVersion::default(),
3381 )
3382 .await
3383 .unwrap();
3384
3385 assert!(created);
3386 assert_eq!(resource.id(), "new-id");
3387 assert_eq!(resource.version_id(), "1");
3388 }
3389
3390 #[tokio::test]
3391 async fn test_create_or_update_existing() {
3392 let backend = create_test_backend();
3393 let tenant = create_test_tenant();
3394
3395 backend
3397 .create(
3398 &tenant,
3399 "Patient",
3400 json!({"id": "existing-id"}),
3401 FhirVersion::default(),
3402 )
3403 .await
3404 .unwrap();
3405
3406 let (resource, created) = backend
3408 .create_or_update(
3409 &tenant,
3410 "Patient",
3411 "existing-id",
3412 json!({}),
3413 FhirVersion::default(),
3414 )
3415 .await
3416 .unwrap();
3417
3418 assert!(!created);
3419 assert_eq!(resource.version_id(), "2");
3420 }
3421
3422 #[tokio::test]
3423 async fn test_count() {
3424 let backend = create_test_backend();
3425 let tenant = create_test_tenant();
3426
3427 assert_eq!(backend.count(&tenant, Some("Patient")).await.unwrap(), 0);
3429
3430 backend
3432 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3433 .await
3434 .unwrap();
3435 backend
3436 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3437 .await
3438 .unwrap();
3439 backend
3440 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3441 .await
3442 .unwrap();
3443
3444 assert_eq!(backend.count(&tenant, Some("Patient")).await.unwrap(), 2);
3445 assert_eq!(
3446 backend.count(&tenant, Some("Observation")).await.unwrap(),
3447 1
3448 );
3449 assert_eq!(backend.count(&tenant, None).await.unwrap(), 3);
3450 }
3451
3452 #[tokio::test]
3453 async fn test_tenant_isolation() {
3454 let backend = create_test_backend();
3455
3456 let tenant1 =
3457 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3458 let tenant2 =
3459 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3460
3461 let resource = json!({"id": "patient-1"});
3463 backend
3464 .create(&tenant1, "Patient", resource, FhirVersion::default())
3465 .await
3466 .unwrap();
3467
3468 assert!(
3470 backend
3471 .read(&tenant1, "Patient", "patient-1")
3472 .await
3473 .unwrap()
3474 .is_some()
3475 );
3476
3477 assert!(
3479 backend
3480 .read(&tenant2, "Patient", "patient-1")
3481 .await
3482 .unwrap()
3483 .is_none()
3484 );
3485 }
3486
3487 #[tokio::test]
3492 async fn test_history_instance_basic() {
3493 let backend = create_test_backend();
3494 let tenant = create_test_tenant();
3495
3496 let resource = json!({"name": [{"family": "Smith"}]});
3498 let created = backend
3499 .create(&tenant, "Patient", resource, FhirVersion::default())
3500 .await
3501 .unwrap();
3502
3503 let v2 = backend
3505 .update(&tenant, &created, json!({"name": [{"family": "Jones"}]}))
3506 .await
3507 .unwrap();
3508 let _v3 = backend
3509 .update(&tenant, &v2, json!({"name": [{"family": "Brown"}]}))
3510 .await
3511 .unwrap();
3512
3513 let params = HistoryParams::new();
3515 let history = backend
3516 .history_instance(&tenant, "Patient", created.id(), ¶ms)
3517 .await
3518 .unwrap();
3519
3520 assert_eq!(history.items.len(), 3);
3522 assert_eq!(history.items[0].resource.version_id(), "3");
3523 assert_eq!(history.items[1].resource.version_id(), "2");
3524 assert_eq!(history.items[2].resource.version_id(), "1");
3525
3526 assert_eq!(history.items[0].method, HistoryMethod::Put);
3528 assert_eq!(history.items[1].method, HistoryMethod::Put);
3529 assert_eq!(history.items[2].method, HistoryMethod::Post);
3530 }
3531
3532 #[tokio::test]
3533 async fn test_history_instance_count() {
3534 let backend = create_test_backend();
3535 let tenant = create_test_tenant();
3536
3537 let resource = json!({});
3539 let created = backend
3540 .create(&tenant, "Patient", resource, FhirVersion::default())
3541 .await
3542 .unwrap();
3543 let v2 = backend.update(&tenant, &created, json!({})).await.unwrap();
3544 let _v3 = backend.update(&tenant, &v2, json!({})).await.unwrap();
3545
3546 let count = backend
3547 .history_instance_count(&tenant, "Patient", created.id())
3548 .await
3549 .unwrap();
3550 assert_eq!(count, 3);
3551 }
3552
3553 #[tokio::test]
3554 async fn test_history_instance_with_delete() {
3555 let backend = create_test_backend();
3556 let tenant = create_test_tenant();
3557
3558 let resource = json!({"id": "p1"});
3560 let created = backend
3561 .create(&tenant, "Patient", resource, FhirVersion::default())
3562 .await
3563 .unwrap();
3564 let _v2 = backend
3565 .update(&tenant, &created, json!({"id": "p1"}))
3566 .await
3567 .unwrap();
3568 backend.delete(&tenant, "Patient", "p1").await.unwrap();
3569
3570 let params = HistoryParams::new().include_deleted(true);
3572 let history = backend
3573 .history_instance(&tenant, "Patient", "p1", ¶ms)
3574 .await
3575 .unwrap();
3576
3577 assert_eq!(history.items.len(), 3);
3578 assert_eq!(history.items[0].method, HistoryMethod::Delete);
3579 assert_eq!(history.items[0].resource.version_id(), "3");
3580 }
3581
3582 #[tokio::test]
3583 async fn test_history_instance_exclude_deleted() {
3584 let backend = create_test_backend();
3585 let tenant = create_test_tenant();
3586
3587 let resource = json!({"id": "p2"});
3589 let created = backend
3590 .create(&tenant, "Patient", resource, FhirVersion::default())
3591 .await
3592 .unwrap();
3593 let _v2 = backend
3594 .update(&tenant, &created, json!({"id": "p2"}))
3595 .await
3596 .unwrap();
3597 backend.delete(&tenant, "Patient", "p2").await.unwrap();
3598
3599 let params = HistoryParams::new().include_deleted(false);
3601 let history = backend
3602 .history_instance(&tenant, "Patient", "p2", ¶ms)
3603 .await
3604 .unwrap();
3605
3606 assert_eq!(history.items.len(), 2);
3608 assert_eq!(history.items[0].resource.version_id(), "2");
3609 assert_eq!(history.items[1].resource.version_id(), "1");
3610 }
3611
3612 #[tokio::test]
3613 async fn test_history_instance_pagination() {
3614 let backend = create_test_backend();
3615 let tenant = create_test_tenant();
3616
3617 let resource = json!({});
3619 let mut current = backend
3620 .create(&tenant, "Patient", resource, FhirVersion::default())
3621 .await
3622 .unwrap();
3623 for _ in 0..4 {
3624 current = backend.update(&tenant, ¤t, json!({})).await.unwrap();
3625 }
3626 let params = HistoryParams::new().count(2);
3630 let page1 = backend
3631 .history_instance(&tenant, "Patient", current.id(), ¶ms)
3632 .await
3633 .unwrap();
3634
3635 assert_eq!(page1.items.len(), 2);
3636 assert_eq!(page1.items[0].resource.version_id(), "5");
3637 assert_eq!(page1.items[1].resource.version_id(), "4");
3638 assert!(page1.page_info.has_next);
3639 }
3640
3641 #[tokio::test]
3642 async fn test_history_instance_nonexistent() {
3643 let backend = create_test_backend();
3644 let tenant = create_test_tenant();
3645
3646 let params = HistoryParams::new();
3647 let history = backend
3648 .history_instance(&tenant, "Patient", "nonexistent", ¶ms)
3649 .await
3650 .unwrap();
3651
3652 assert!(history.items.is_empty());
3653 }
3654
3655 #[tokio::test]
3656 async fn test_history_instance_tenant_isolation() {
3657 let backend = create_test_backend();
3658 let tenant1 =
3659 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3660 let tenant2 =
3661 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3662
3663 let resource = json!({"id": "shared-id"});
3665 let created = backend
3666 .create(&tenant1, "Patient", resource, FhirVersion::default())
3667 .await
3668 .unwrap();
3669 let _v2 = backend
3670 .update(&tenant1, &created, json!({"id": "shared-id"}))
3671 .await
3672 .unwrap();
3673
3674 let history1 = backend
3676 .history_instance(&tenant1, "Patient", "shared-id", &HistoryParams::new())
3677 .await
3678 .unwrap();
3679 assert_eq!(history1.items.len(), 2);
3680
3681 let history2 = backend
3683 .history_instance(&tenant2, "Patient", "shared-id", &HistoryParams::new())
3684 .await
3685 .unwrap();
3686 assert!(history2.items.is_empty());
3687 }
3688
3689 #[tokio::test]
3694 async fn test_history_type_basic() {
3695 let backend = create_test_backend();
3696 let tenant = create_test_tenant();
3697
3698 let p1 = backend
3700 .create(
3701 &tenant,
3702 "Patient",
3703 json!({"id": "p1"}),
3704 FhirVersion::default(),
3705 )
3706 .await
3707 .unwrap();
3708 let _p2 = backend
3709 .create(
3710 &tenant,
3711 "Patient",
3712 json!({"id": "p2"}),
3713 FhirVersion::default(),
3714 )
3715 .await
3716 .unwrap();
3717
3718 let _p1_v2 = backend
3720 .update(&tenant, &p1, json!({"id": "p1"}))
3721 .await
3722 .unwrap();
3723
3724 let params = HistoryParams::new();
3726 let history = backend
3727 .history_type(&tenant, "Patient", ¶ms)
3728 .await
3729 .unwrap();
3730
3731 assert_eq!(history.items.len(), 3);
3733
3734 for entry in &history.items {
3736 assert_eq!(entry.resource.resource_type(), "Patient");
3737 }
3738 }
3739
3740 #[tokio::test]
3741 async fn test_history_type_count() {
3742 let backend = create_test_backend();
3743 let tenant = create_test_tenant();
3744
3745 let p1 = backend
3747 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3748 .await
3749 .unwrap();
3750 let _p1_v2 = backend.update(&tenant, &p1, json!({})).await.unwrap();
3751 let _p2 = backend
3752 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3753 .await
3754 .unwrap();
3755
3756 backend
3758 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3759 .await
3760 .unwrap();
3761
3762 let count = backend
3764 .history_type_count(&tenant, "Patient")
3765 .await
3766 .unwrap();
3767 assert_eq!(count, 3); let obs_count = backend
3771 .history_type_count(&tenant, "Observation")
3772 .await
3773 .unwrap();
3774 assert_eq!(obs_count, 1);
3775 }
3776
3777 #[tokio::test]
3778 async fn test_history_type_filters_by_type() {
3779 let backend = create_test_backend();
3780 let tenant = create_test_tenant();
3781
3782 backend
3784 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3785 .await
3786 .unwrap();
3787 backend
3788 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3789 .await
3790 .unwrap();
3791 backend
3792 .create(&tenant, "Encounter", json!({}), FhirVersion::default())
3793 .await
3794 .unwrap();
3795
3796 let history = backend
3798 .history_type(&tenant, "Patient", &HistoryParams::new())
3799 .await
3800 .unwrap();
3801 assert_eq!(history.items.len(), 1);
3802 assert_eq!(history.items[0].resource.resource_type(), "Patient");
3803
3804 let obs_history = backend
3806 .history_type(&tenant, "Observation", &HistoryParams::new())
3807 .await
3808 .unwrap();
3809 assert_eq!(obs_history.items.len(), 1);
3810 assert_eq!(obs_history.items[0].resource.resource_type(), "Observation");
3811 }
3812
3813 #[tokio::test]
3814 async fn test_history_type_includes_deleted() {
3815 let backend = create_test_backend();
3816 let tenant = create_test_tenant();
3817
3818 let _p1 = backend
3820 .create(
3821 &tenant,
3822 "Patient",
3823 json!({"id": "del-p1"}),
3824 FhirVersion::default(),
3825 )
3826 .await
3827 .unwrap();
3828 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
3829
3830 backend
3832 .create(
3833 &tenant,
3834 "Patient",
3835 json!({"id": "p2"}),
3836 FhirVersion::default(),
3837 )
3838 .await
3839 .unwrap();
3840
3841 let history = backend
3843 .history_type(&tenant, "Patient", &HistoryParams::new())
3844 .await
3845 .unwrap();
3846 assert_eq!(history.items.len(), 2); let history_with_deleted = backend
3850 .history_type(
3851 &tenant,
3852 "Patient",
3853 &HistoryParams::new().include_deleted(true),
3854 )
3855 .await
3856 .unwrap();
3857 assert_eq!(history_with_deleted.items.len(), 3); }
3859
3860 #[tokio::test]
3861 async fn test_history_type_tenant_isolation() {
3862 let backend = create_test_backend();
3863 let tenant1 =
3864 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3865 let tenant2 =
3866 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3867
3868 backend
3870 .create(&tenant1, "Patient", json!({}), FhirVersion::default())
3871 .await
3872 .unwrap();
3873 backend
3874 .create(&tenant1, "Patient", json!({}), FhirVersion::default())
3875 .await
3876 .unwrap();
3877
3878 backend
3880 .create(&tenant2, "Patient", json!({}), FhirVersion::default())
3881 .await
3882 .unwrap();
3883
3884 let history1 = backend
3886 .history_type(&tenant1, "Patient", &HistoryParams::new())
3887 .await
3888 .unwrap();
3889 assert_eq!(history1.items.len(), 2);
3890
3891 let history2 = backend
3893 .history_type(&tenant2, "Patient", &HistoryParams::new())
3894 .await
3895 .unwrap();
3896 assert_eq!(history2.items.len(), 1);
3897 }
3898
3899 #[tokio::test]
3900 async fn test_history_type_pagination() {
3901 let backend = create_test_backend();
3902 let tenant = create_test_tenant();
3903
3904 for i in 0..5 {
3906 backend
3907 .create(
3908 &tenant,
3909 "Patient",
3910 json!({"id": format!("p{}", i)}),
3911 FhirVersion::default(),
3912 )
3913 .await
3914 .unwrap();
3915 }
3916
3917 let params = HistoryParams::new().count(2);
3919 let page1 = backend
3920 .history_type(&tenant, "Patient", ¶ms)
3921 .await
3922 .unwrap();
3923
3924 assert_eq!(page1.items.len(), 2);
3925 assert!(page1.page_info.has_next);
3926 }
3927
3928 #[tokio::test]
3929 async fn test_history_type_empty() {
3930 let backend = create_test_backend();
3931 let tenant = create_test_tenant();
3932
3933 let history = backend
3935 .history_type(&tenant, "Patient", &HistoryParams::new())
3936 .await
3937 .unwrap();
3938 assert!(history.items.is_empty());
3939 assert!(!history.page_info.has_next);
3940 }
3941
3942 #[tokio::test]
3947 async fn test_history_system_basic() {
3948 let backend = create_test_backend();
3949 let tenant = create_test_tenant();
3950
3951 let p1 = backend
3953 .create(
3954 &tenant,
3955 "Patient",
3956 json!({"id": "p1"}),
3957 FhirVersion::default(),
3958 )
3959 .await
3960 .unwrap();
3961 backend
3962 .create(
3963 &tenant,
3964 "Observation",
3965 json!({"id": "o1"}),
3966 FhirVersion::default(),
3967 )
3968 .await
3969 .unwrap();
3970 backend
3971 .create(
3972 &tenant,
3973 "Encounter",
3974 json!({"id": "e1"}),
3975 FhirVersion::default(),
3976 )
3977 .await
3978 .unwrap();
3979
3980 let _p1_v2 = backend
3982 .update(&tenant, &p1, json!({"id": "p1"}))
3983 .await
3984 .unwrap();
3985
3986 let history = backend
3988 .history_system(&tenant, &HistoryParams::new())
3989 .await
3990 .unwrap();
3991
3992 assert_eq!(history.items.len(), 4);
3994
3995 let types: std::collections::HashSet<_> = history
3997 .items
3998 .iter()
3999 .map(|e| e.resource.resource_type())
4000 .collect();
4001 assert!(types.contains("Patient"));
4002 assert!(types.contains("Observation"));
4003 assert!(types.contains("Encounter"));
4004 }
4005
4006 #[tokio::test]
4007 async fn test_history_system_count() {
4008 let backend = create_test_backend();
4009 let tenant = create_test_tenant();
4010
4011 let p1 = backend
4013 .create(&tenant, "Patient", json!({}), FhirVersion::default())
4014 .await
4015 .unwrap();
4016 let _p1_v2 = backend.update(&tenant, &p1, json!({})).await.unwrap();
4017 backend
4018 .create(&tenant, "Observation", json!({}), FhirVersion::default())
4019 .await
4020 .unwrap();
4021 backend
4022 .create(&tenant, "Encounter", json!({}), FhirVersion::default())
4023 .await
4024 .unwrap();
4025
4026 let count = backend.history_system_count(&tenant).await.unwrap();
4028 assert_eq!(count, 4); }
4030
4031 #[tokio::test]
4032 async fn test_history_system_includes_deleted() {
4033 let backend = create_test_backend();
4034 let tenant = create_test_tenant();
4035
4036 backend
4038 .create(
4039 &tenant,
4040 "Patient",
4041 json!({"id": "del-p1"}),
4042 FhirVersion::default(),
4043 )
4044 .await
4045 .unwrap();
4046 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
4047
4048 backend
4050 .create(&tenant, "Observation", json!({}), FhirVersion::default())
4051 .await
4052 .unwrap();
4053
4054 let history = backend
4056 .history_system(&tenant, &HistoryParams::new())
4057 .await
4058 .unwrap();
4059 assert_eq!(history.items.len(), 2); let history_with_deleted = backend
4063 .history_system(&tenant, &HistoryParams::new().include_deleted(true))
4064 .await
4065 .unwrap();
4066 assert_eq!(history_with_deleted.items.len(), 3); }
4068
4069 #[tokio::test]
4070 async fn test_history_system_tenant_isolation() {
4071 let backend = create_test_backend();
4072 let tenant1 =
4073 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4074 let tenant2 =
4075 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4076
4077 backend
4079 .create(&tenant1, "Patient", json!({}), FhirVersion::default())
4080 .await
4081 .unwrap();
4082 backend
4083 .create(&tenant1, "Observation", json!({}), FhirVersion::default())
4084 .await
4085 .unwrap();
4086
4087 backend
4089 .create(&tenant2, "Encounter", json!({}), FhirVersion::default())
4090 .await
4091 .unwrap();
4092
4093 let history1 = backend
4095 .history_system(&tenant1, &HistoryParams::new())
4096 .await
4097 .unwrap();
4098 assert_eq!(history1.items.len(), 2);
4099
4100 let history2 = backend
4102 .history_system(&tenant2, &HistoryParams::new())
4103 .await
4104 .unwrap();
4105 assert_eq!(history2.items.len(), 1);
4106
4107 assert_eq!(backend.history_system_count(&tenant1).await.unwrap(), 2);
4109 assert_eq!(backend.history_system_count(&tenant2).await.unwrap(), 1);
4110 }
4111
4112 #[tokio::test]
4113 async fn test_history_system_pagination() {
4114 let backend = create_test_backend();
4115 let tenant = create_test_tenant();
4116
4117 for i in 0..3 {
4119 backend
4120 .create(
4121 &tenant,
4122 "Patient",
4123 json!({"id": format!("p{}", i)}),
4124 FhirVersion::default(),
4125 )
4126 .await
4127 .unwrap();
4128 }
4129 for i in 0..2 {
4130 backend
4131 .create(
4132 &tenant,
4133 "Observation",
4134 json!({"id": format!("o{}", i)}),
4135 FhirVersion::default(),
4136 )
4137 .await
4138 .unwrap();
4139 }
4140 let params = HistoryParams::new().count(2);
4144 let page1 = backend.history_system(&tenant, ¶ms).await.unwrap();
4145
4146 assert_eq!(page1.items.len(), 2);
4147 assert!(page1.page_info.has_next);
4148 }
4149
4150 #[tokio::test]
4151 async fn test_history_system_empty() {
4152 let backend = create_test_backend();
4153 let tenant = create_test_tenant();
4154
4155 let history = backend
4157 .history_system(&tenant, &HistoryParams::new())
4158 .await
4159 .unwrap();
4160 assert!(history.items.is_empty());
4161 assert!(!history.page_info.has_next);
4162
4163 assert_eq!(backend.history_system_count(&tenant).await.unwrap(), 0);
4164 }
4165
4166 #[tokio::test]
4167 async fn test_history_system_ordered_by_time() {
4168 let backend = create_test_backend();
4169 let tenant = create_test_tenant();
4170
4171 backend
4173 .create(
4174 &tenant,
4175 "Patient",
4176 json!({"id": "first"}),
4177 FhirVersion::default(),
4178 )
4179 .await
4180 .unwrap();
4181 backend
4182 .create(
4183 &tenant,
4184 "Observation",
4185 json!({"id": "second"}),
4186 FhirVersion::default(),
4187 )
4188 .await
4189 .unwrap();
4190 backend
4191 .create(
4192 &tenant,
4193 "Encounter",
4194 json!({"id": "third"}),
4195 FhirVersion::default(),
4196 )
4197 .await
4198 .unwrap();
4199
4200 let history = backend
4201 .history_system(&tenant, &HistoryParams::new())
4202 .await
4203 .unwrap();
4204
4205 assert_eq!(history.items.len(), 3);
4207 assert_eq!(history.items[0].resource.id(), "third");
4209 assert_eq!(history.items[1].resource.id(), "second");
4210 assert_eq!(history.items[2].resource.id(), "first");
4211 }
4212
4213 #[tokio::test]
4218 async fn test_delete_instance_history() {
4219 let backend = create_test_backend();
4220 let tenant = create_test_tenant();
4221
4222 let p1 = backend
4224 .create(
4225 &tenant,
4226 "Patient",
4227 json!({"id": "p1", "name": [{"family": "Smith"}]}),
4228 FhirVersion::default(),
4229 )
4230 .await
4231 .unwrap();
4232 let p1_v2 = backend
4233 .update(
4234 &tenant,
4235 &p1,
4236 json!({"id": "p1", "name": [{"family": "Jones"}]}),
4237 )
4238 .await
4239 .unwrap();
4240 let _p1_v3 = backend
4241 .update(
4242 &tenant,
4243 &p1_v2,
4244 json!({"id": "p1", "name": [{"family": "Brown"}]}),
4245 )
4246 .await
4247 .unwrap();
4248
4249 let history = backend
4251 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4252 .await
4253 .unwrap();
4254 assert_eq!(history.items.len(), 3);
4255
4256 let deleted_count = backend
4258 .delete_instance_history(&tenant, "Patient", "p1")
4259 .await
4260 .unwrap();
4261 assert_eq!(deleted_count, 2); let history = backend
4265 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4266 .await
4267 .unwrap();
4268 assert_eq!(history.items.len(), 1);
4269 assert_eq!(history.items[0].resource.version_id(), "3");
4270
4271 let resource = backend.read(&tenant, "Patient", "p1").await.unwrap();
4273 assert!(resource.is_some());
4274 assert_eq!(resource.unwrap().version_id(), "3");
4275 }
4276
4277 #[tokio::test]
4278 async fn test_delete_instance_history_nonexistent() {
4279 let backend = create_test_backend();
4280 let tenant = create_test_tenant();
4281
4282 let result = backend
4284 .delete_instance_history(&tenant, "Patient", "nonexistent")
4285 .await;
4286
4287 assert!(matches!(
4288 result,
4289 Err(StorageError::Resource(ResourceError::NotFound { .. }))
4290 ));
4291 }
4292
4293 #[tokio::test]
4294 async fn test_delete_version() {
4295 let backend = create_test_backend();
4296 let tenant = create_test_tenant();
4297
4298 let p1 = backend
4300 .create(
4301 &tenant,
4302 "Patient",
4303 json!({"id": "p1", "name": [{"family": "Smith"}]}),
4304 FhirVersion::default(),
4305 )
4306 .await
4307 .unwrap();
4308 let p1_v2 = backend
4309 .update(
4310 &tenant,
4311 &p1,
4312 json!({"id": "p1", "name": [{"family": "Jones"}]}),
4313 )
4314 .await
4315 .unwrap();
4316 let _p1_v3 = backend
4317 .update(
4318 &tenant,
4319 &p1_v2,
4320 json!({"id": "p1", "name": [{"family": "Brown"}]}),
4321 )
4322 .await
4323 .unwrap();
4324
4325 backend
4327 .delete_version(&tenant, "Patient", "p1", "2")
4328 .await
4329 .unwrap();
4330
4331 let history = backend
4333 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4334 .await
4335 .unwrap();
4336 assert_eq!(history.items.len(), 2);
4337 let versions: Vec<&str> = history
4338 .items
4339 .iter()
4340 .map(|e| e.resource.version_id())
4341 .collect();
4342 assert!(versions.contains(&"1"));
4343 assert!(versions.contains(&"3"));
4344 assert!(!versions.contains(&"2"));
4345 }
4346
4347 #[tokio::test]
4348 async fn test_delete_version_current_fails() {
4349 let backend = create_test_backend();
4350 let tenant = create_test_tenant();
4351
4352 let p1 = backend
4354 .create(
4355 &tenant,
4356 "Patient",
4357 json!({"id": "p1"}),
4358 FhirVersion::default(),
4359 )
4360 .await
4361 .unwrap();
4362 let _p1_v2 = backend
4363 .update(&tenant, &p1, json!({"id": "p1"}))
4364 .await
4365 .unwrap();
4366
4367 let result = backend.delete_version(&tenant, "Patient", "p1", "2").await;
4369
4370 assert!(matches!(result, Err(StorageError::Validation(_))));
4372 }
4373
4374 #[tokio::test]
4375 async fn test_delete_version_nonexistent() {
4376 let backend = create_test_backend();
4377 let tenant = create_test_tenant();
4378
4379 backend
4381 .create(
4382 &tenant,
4383 "Patient",
4384 json!({"id": "p1"}),
4385 FhirVersion::default(),
4386 )
4387 .await
4388 .unwrap();
4389
4390 let result = backend
4392 .delete_version(&tenant, "Patient", "p1", "999")
4393 .await;
4394
4395 assert!(matches!(
4396 result,
4397 Err(StorageError::Resource(
4398 ResourceError::VersionNotFound { .. }
4399 ))
4400 ));
4401 }
4402
4403 #[tokio::test]
4404 async fn test_delete_version_resource_not_found() {
4405 let backend = create_test_backend();
4406 let tenant = create_test_tenant();
4407
4408 let result = backend
4410 .delete_version(&tenant, "Patient", "nonexistent", "1")
4411 .await;
4412
4413 assert!(matches!(
4414 result,
4415 Err(StorageError::Resource(ResourceError::NotFound { .. }))
4416 ));
4417 }
4418
4419 #[tokio::test]
4424 async fn test_purge_single_resource() {
4425 let backend = create_test_backend();
4426 let tenant = create_test_tenant();
4427
4428 let p1 = backend
4430 .create(
4431 &tenant,
4432 "Patient",
4433 json!({"id": "p1"}),
4434 FhirVersion::default(),
4435 )
4436 .await
4437 .unwrap();
4438 let _p1_v2 = backend
4439 .update(&tenant, &p1, json!({"id": "p1"}))
4440 .await
4441 .unwrap();
4442
4443 backend.purge(&tenant, "Patient", "p1").await.unwrap();
4445
4446 let read_result = backend.read(&tenant, "Patient", "p1").await.unwrap();
4448 assert!(read_result.is_none());
4449
4450 let history = backend
4452 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4453 .await
4454 .unwrap();
4455 assert!(history.items.is_empty());
4456 }
4457
4458 #[tokio::test]
4459 async fn test_purge_deleted_resource() {
4460 let backend = create_test_backend();
4461 let tenant = create_test_tenant();
4462
4463 backend
4465 .create(
4466 &tenant,
4467 "Patient",
4468 json!({"id": "del-p1"}),
4469 FhirVersion::default(),
4470 )
4471 .await
4472 .unwrap();
4473 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
4474
4475 backend.purge(&tenant, "Patient", "del-p1").await.unwrap();
4477
4478 let history = backend
4480 .history_instance(
4481 &tenant,
4482 "Patient",
4483 "del-p1",
4484 &HistoryParams::new().include_deleted(true),
4485 )
4486 .await
4487 .unwrap();
4488 assert!(history.items.is_empty());
4489 }
4490
4491 #[tokio::test]
4492 async fn test_purge_nonexistent_resource() {
4493 let backend = create_test_backend();
4494 let tenant = create_test_tenant();
4495
4496 let result = backend.purge(&tenant, "Patient", "nonexistent").await;
4498 assert!(result.is_err());
4499 }
4500
4501 #[tokio::test]
4502 async fn test_purge_tenant_isolation() {
4503 let backend = create_test_backend();
4504 let tenant1 =
4505 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4506 let tenant2 =
4507 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4508
4509 backend
4511 .create(
4512 &tenant1,
4513 "Patient",
4514 json!({"id": "shared-id"}),
4515 FhirVersion::default(),
4516 )
4517 .await
4518 .unwrap();
4519
4520 backend
4522 .create(
4523 &tenant2,
4524 "Patient",
4525 json!({"id": "shared-id"}),
4526 FhirVersion::default(),
4527 )
4528 .await
4529 .unwrap();
4530
4531 backend
4533 .purge(&tenant1, "Patient", "shared-id")
4534 .await
4535 .unwrap();
4536
4537 let t2_read = backend
4539 .read(&tenant2, "Patient", "shared-id")
4540 .await
4541 .unwrap();
4542 assert!(t2_read.is_some());
4543
4544 let t1_read = backend
4546 .read(&tenant1, "Patient", "shared-id")
4547 .await
4548 .unwrap();
4549 assert!(t1_read.is_none());
4550 }
4551
4552 #[tokio::test]
4553 async fn test_purge_all_single_type() {
4554 let backend = create_test_backend();
4555 let tenant = create_test_tenant();
4556
4557 for i in 0..5 {
4559 backend
4560 .create(
4561 &tenant,
4562 "Patient",
4563 json!({"id": format!("p{}", i)}),
4564 FhirVersion::default(),
4565 )
4566 .await
4567 .unwrap();
4568 }
4569
4570 backend
4572 .create(&tenant, "Observation", json!({}), FhirVersion::default())
4573 .await
4574 .unwrap();
4575
4576 let count = backend.purge_all(&tenant, "Patient").await.unwrap();
4578 assert_eq!(count, 5);
4579
4580 let patient_history = backend
4582 .history_type(&tenant, "Patient", &HistoryParams::new())
4583 .await
4584 .unwrap();
4585 assert!(patient_history.items.is_empty());
4586
4587 let obs_history = backend
4589 .history_type(&tenant, "Observation", &HistoryParams::new())
4590 .await
4591 .unwrap();
4592 assert_eq!(obs_history.items.len(), 1);
4593 }
4594
4595 #[tokio::test]
4596 async fn test_purge_all_empty_type() {
4597 let backend = create_test_backend();
4598 let tenant = create_test_tenant();
4599
4600 let count = backend.purge_all(&tenant, "Patient").await.unwrap();
4602 assert_eq!(count, 0);
4603 }
4604
4605 #[tokio::test]
4606 async fn test_purge_all_tenant_isolation() {
4607 let backend = create_test_backend();
4608 let tenant1 =
4609 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4610 let tenant2 =
4611 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4612
4613 for i in 0..3 {
4615 backend
4616 .create(
4617 &tenant1,
4618 "Patient",
4619 json!({"id": format!("t1-p{}", i)}),
4620 FhirVersion::default(),
4621 )
4622 .await
4623 .unwrap();
4624 }
4625 for i in 0..2 {
4626 backend
4627 .create(
4628 &tenant2,
4629 "Patient",
4630 json!({"id": format!("t2-p{}", i)}),
4631 FhirVersion::default(),
4632 )
4633 .await
4634 .unwrap();
4635 }
4636
4637 let count = backend.purge_all(&tenant1, "Patient").await.unwrap();
4639 assert_eq!(count, 3);
4640
4641 let t2_history = backend
4643 .history_type(&tenant2, "Patient", &HistoryParams::new())
4644 .await
4645 .unwrap();
4646 assert_eq!(t2_history.items.len(), 2);
4647 }
4648
4649 #[tokio::test]
4654 async fn test_modified_since_basic() {
4655 let backend = create_test_backend();
4656 let tenant = create_test_tenant();
4657
4658 let before_create = Utc::now();
4660
4661 backend
4663 .create(
4664 &tenant,
4665 "Patient",
4666 json!({"id": "p1"}),
4667 FhirVersion::default(),
4668 )
4669 .await
4670 .unwrap();
4671 backend
4672 .create(
4673 &tenant,
4674 "Patient",
4675 json!({"id": "p2"}),
4676 FhirVersion::default(),
4677 )
4678 .await
4679 .unwrap();
4680 backend
4681 .create(
4682 &tenant,
4683 "Observation",
4684 json!({"id": "o1"}),
4685 FhirVersion::default(),
4686 )
4687 .await
4688 .unwrap();
4689
4690 let pagination = Pagination::default();
4692 let result = backend
4693 .modified_since(&tenant, None, before_create, &pagination)
4694 .await
4695 .unwrap();
4696
4697 assert_eq!(result.items.len(), 3);
4699 }
4700
4701 #[tokio::test]
4702 async fn test_modified_since_with_type_filter() {
4703 let backend = create_test_backend();
4704 let tenant = create_test_tenant();
4705
4706 let before_create = Utc::now();
4707
4708 backend
4710 .create(
4711 &tenant,
4712 "Patient",
4713 json!({"id": "p1"}),
4714 FhirVersion::default(),
4715 )
4716 .await
4717 .unwrap();
4718 backend
4719 .create(
4720 &tenant,
4721 "Patient",
4722 json!({"id": "p2"}),
4723 FhirVersion::default(),
4724 )
4725 .await
4726 .unwrap();
4727 backend
4728 .create(
4729 &tenant,
4730 "Observation",
4731 json!({"id": "o1"}),
4732 FhirVersion::default(),
4733 )
4734 .await
4735 .unwrap();
4736
4737 let pagination = Pagination::default();
4739 let result = backend
4740 .modified_since(&tenant, Some("Patient"), before_create, &pagination)
4741 .await
4742 .unwrap();
4743
4744 assert_eq!(result.items.len(), 2);
4746 for resource in &result.items {
4747 assert_eq!(resource.resource_type(), "Patient");
4748 }
4749 }
4750
4751 #[tokio::test]
4752 async fn test_modified_since_excludes_older() {
4753 let backend = create_test_backend();
4754 let tenant = create_test_tenant();
4755
4756 backend
4758 .create(
4759 &tenant,
4760 "Patient",
4761 json!({"id": "old"}),
4762 FhirVersion::default(),
4763 )
4764 .await
4765 .unwrap();
4766
4767 let after_first = Utc::now();
4769
4770 backend
4772 .create(
4773 &tenant,
4774 "Patient",
4775 json!({"id": "new"}),
4776 FhirVersion::default(),
4777 )
4778 .await
4779 .unwrap();
4780
4781 let pagination = Pagination::default();
4783 let result = backend
4784 .modified_since(&tenant, None, after_first, &pagination)
4785 .await
4786 .unwrap();
4787
4788 assert_eq!(result.items.len(), 1);
4790 assert_eq!(result.items[0].id(), "new");
4791 }
4792
4793 #[tokio::test]
4794 async fn test_modified_since_tenant_isolation() {
4795 let backend = create_test_backend();
4796 let tenant1 =
4797 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4798 let tenant2 =
4799 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4800
4801 let before_create = Utc::now();
4802
4803 backend
4805 .create(
4806 &tenant1,
4807 "Patient",
4808 json!({"id": "t1-p1"}),
4809 FhirVersion::default(),
4810 )
4811 .await
4812 .unwrap();
4813 backend
4814 .create(
4815 &tenant2,
4816 "Patient",
4817 json!({"id": "t2-p1"}),
4818 FhirVersion::default(),
4819 )
4820 .await
4821 .unwrap();
4822
4823 let pagination = Pagination::default();
4825 let result1 = backend
4826 .modified_since(&tenant1, None, before_create, &pagination)
4827 .await
4828 .unwrap();
4829 assert_eq!(result1.items.len(), 1);
4830 assert_eq!(result1.items[0].id(), "t1-p1");
4831
4832 let result2 = backend
4834 .modified_since(&tenant2, None, before_create, &pagination)
4835 .await
4836 .unwrap();
4837 assert_eq!(result2.items.len(), 1);
4838 assert_eq!(result2.items[0].id(), "t2-p1");
4839 }
4840
4841 #[tokio::test]
4842 async fn test_modified_since_excludes_deleted() {
4843 let backend = create_test_backend();
4844 let tenant = create_test_tenant();
4845
4846 let before_create = Utc::now();
4847
4848 backend
4850 .create(
4851 &tenant,
4852 "Patient",
4853 json!({"id": "del-p1"}),
4854 FhirVersion::default(),
4855 )
4856 .await
4857 .unwrap();
4858 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
4859
4860 backend
4862 .create(
4863 &tenant,
4864 "Patient",
4865 json!({"id": "live-p1"}),
4866 FhirVersion::default(),
4867 )
4868 .await
4869 .unwrap();
4870
4871 let pagination = Pagination::default();
4873 let result = backend
4874 .modified_since(&tenant, None, before_create, &pagination)
4875 .await
4876 .unwrap();
4877
4878 assert_eq!(result.items.len(), 1);
4880 assert_eq!(result.items[0].id(), "live-p1");
4881 }
4882
4883 #[tokio::test]
4884 async fn test_modified_since_pagination() {
4885 let backend = create_test_backend();
4886 let tenant = create_test_tenant();
4887
4888 let before_create = Utc::now();
4889
4890 for i in 0..5 {
4892 backend
4893 .create(
4894 &tenant,
4895 "Patient",
4896 json!({"id": format!("p{}", i)}),
4897 FhirVersion::default(),
4898 )
4899 .await
4900 .unwrap();
4901 }
4902
4903 let pagination = Pagination::cursor().with_count(2);
4905 let page1 = backend
4906 .modified_since(&tenant, None, before_create, &pagination)
4907 .await
4908 .unwrap();
4909
4910 assert_eq!(page1.items.len(), 2);
4911 assert!(page1.page_info.has_next);
4912 }
4913
4914 #[tokio::test]
4915 async fn test_modified_since_empty() {
4916 let backend = create_test_backend();
4917 let tenant = create_test_tenant();
4918
4919 let pagination = Pagination::default();
4921 let result = backend
4922 .modified_since(&tenant, None, Utc::now(), &pagination)
4923 .await
4924 .unwrap();
4925
4926 assert!(result.items.is_empty());
4927 assert!(!result.page_info.has_next);
4928 }
4929
4930 #[tokio::test]
4931 async fn test_modified_since_returns_current_version() {
4932 let backend = create_test_backend();
4933 let tenant = create_test_tenant();
4934
4935 let before_create = Utc::now();
4936
4937 let p1 = backend
4939 .create(
4940 &tenant,
4941 "Patient",
4942 json!({"id": "p1", "name": "v1"}),
4943 FhirVersion::default(),
4944 )
4945 .await
4946 .unwrap();
4947 let p1_v2 = backend
4948 .update(&tenant, &p1, json!({"id": "p1", "name": "v2"}))
4949 .await
4950 .unwrap();
4951 let _p1_v3 = backend
4952 .update(&tenant, &p1_v2, json!({"id": "p1", "name": "v3"}))
4953 .await
4954 .unwrap();
4955
4956 let pagination = Pagination::default();
4958 let result = backend
4959 .modified_since(&tenant, None, before_create, &pagination)
4960 .await
4961 .unwrap();
4962
4963 assert_eq!(result.items.len(), 1);
4964 assert_eq!(result.items[0].version_id(), "3");
4965 }
4966
4967 #[tokio::test]
4972 async fn test_conditional_create_no_match() {
4973 let backend = create_test_backend();
4974 let tenant = create_test_tenant();
4975
4976 let result = backend
4978 .conditional_create(
4979 &tenant,
4980 "Patient",
4981 json!({"identifier": [{"value": "12345"}]}),
4982 "identifier=99999", FhirVersion::default(),
4984 )
4985 .await
4986 .unwrap();
4987
4988 match result {
4989 ConditionalCreateResult::Created(resource) => {
4990 assert_eq!(resource.resource_type(), "Patient");
4991 }
4992 _ => panic!("Expected Created result"),
4993 }
4994 }
4995
4996 #[tokio::test]
4997 async fn test_conditional_create_single_match() {
4998 let backend = create_test_backend();
4999 let tenant = create_test_tenant();
5000
5001 let existing = backend
5003 .create(
5004 &tenant,
5005 "Patient",
5006 json!({"id": "p1", "identifier": [{"value": "12345"}]}),
5007 FhirVersion::default(),
5008 )
5009 .await
5010 .unwrap();
5011
5012 let result = backend
5014 .conditional_create(
5015 &tenant,
5016 "Patient",
5017 json!({"identifier": [{"value": "12345"}]}),
5018 "identifier=12345",
5019 FhirVersion::default(),
5020 )
5021 .await
5022 .unwrap();
5023
5024 match result {
5025 ConditionalCreateResult::Exists(resource) => {
5026 assert_eq!(resource.id(), existing.id());
5027 }
5028 _ => panic!("Expected Exists result"),
5029 }
5030 }
5031
5032 #[tokio::test]
5033 async fn test_conditional_create_by_id() {
5034 let backend = create_test_backend();
5035 let tenant = create_test_tenant();
5036
5037 backend
5039 .create(
5040 &tenant,
5041 "Patient",
5042 json!({"id": "p1"}),
5043 FhirVersion::default(),
5044 )
5045 .await
5046 .unwrap();
5047
5048 let result = backend
5050 .conditional_create(
5051 &tenant,
5052 "Patient",
5053 json!({}),
5054 "_id=p1",
5055 FhirVersion::default(),
5056 )
5057 .await
5058 .unwrap();
5059
5060 match result {
5061 ConditionalCreateResult::Exists(resource) => {
5062 assert_eq!(resource.id(), "p1");
5063 }
5064 _ => panic!("Expected Exists result"),
5065 }
5066 }
5067
5068 #[tokio::test]
5069 async fn test_conditional_update_single_match() {
5070 let backend = create_test_backend();
5071 let tenant = create_test_tenant();
5072
5073 backend
5075 .create(
5076 &tenant,
5077 "Patient",
5078 json!({"id": "p1", "identifier": [{"value": "12345"}], "active": false}),
5079 FhirVersion::default(),
5080 )
5081 .await
5082 .unwrap();
5083
5084 let result = backend
5086 .conditional_update(
5087 &tenant,
5088 "Patient",
5089 json!({"id": "p1", "identifier": [{"value": "12345"}], "active": true}),
5090 "identifier=12345",
5091 false,
5092 FhirVersion::default(),
5093 )
5094 .await
5095 .unwrap();
5096
5097 match result {
5098 ConditionalUpdateResult::Updated(resource) => {
5099 assert_eq!(resource.version_id(), "2");
5100 }
5101 _ => panic!("Expected Updated result"),
5102 }
5103 }
5104
5105 #[tokio::test]
5106 async fn test_conditional_update_no_match_no_upsert() {
5107 let backend = create_test_backend();
5108 let tenant = create_test_tenant();
5109
5110 let result = backend
5112 .conditional_update(
5113 &tenant,
5114 "Patient",
5115 json!({"identifier": [{"value": "99999"}]}),
5116 "identifier=99999",
5117 false,
5118 FhirVersion::default(),
5119 )
5120 .await
5121 .unwrap();
5122
5123 match result {
5124 ConditionalUpdateResult::NoMatch => {}
5125 _ => panic!("Expected NoMatch result"),
5126 }
5127 }
5128
5129 #[tokio::test]
5130 async fn test_conditional_update_no_match_with_upsert() {
5131 let backend = create_test_backend();
5132 let tenant = create_test_tenant();
5133
5134 let result = backend
5136 .conditional_update(
5137 &tenant,
5138 "Patient",
5139 json!({"identifier": [{"value": "new-id"}]}),
5140 "identifier=new-id",
5141 true,
5142 FhirVersion::default(),
5143 )
5144 .await
5145 .unwrap();
5146
5147 match result {
5148 ConditionalUpdateResult::Created(resource) => {
5149 assert_eq!(resource.resource_type(), "Patient");
5150 }
5151 _ => panic!("Expected Created result"),
5152 }
5153 }
5154
5155 #[tokio::test]
5156 async fn test_conditional_delete_single_match() {
5157 let backend = create_test_backend();
5158 let tenant = create_test_tenant();
5159
5160 backend
5162 .create(
5163 &tenant,
5164 "Patient",
5165 json!({"id": "p1"}),
5166 FhirVersion::default(),
5167 )
5168 .await
5169 .unwrap();
5170
5171 let result = backend
5173 .conditional_delete(&tenant, "Patient", "_id=p1")
5174 .await
5175 .unwrap();
5176
5177 match result {
5178 ConditionalDeleteResult::Deleted => {
5179 let read_result = backend.read(&tenant, "Patient", "p1").await;
5181 match read_result {
5182 Ok(None) => {} Err(StorageError::Resource(ResourceError::Gone { .. })) => {} other => panic!("Expected None or Gone, got {:?}", other),
5185 }
5186 }
5187 _ => panic!("Expected Deleted result"),
5188 }
5189 }
5190
5191 #[tokio::test]
5192 async fn test_conditional_delete_no_match() {
5193 let backend = create_test_backend();
5194 let tenant = create_test_tenant();
5195
5196 let result = backend
5198 .conditional_delete(&tenant, "Patient", "_id=nonexistent")
5199 .await
5200 .unwrap();
5201
5202 match result {
5203 ConditionalDeleteResult::NoMatch => {}
5204 _ => panic!("Expected NoMatch result"),
5205 }
5206 }
5207
5208 #[tokio::test]
5209 async fn test_conditional_operations_tenant_isolation() {
5210 let backend = create_test_backend();
5211 let tenant1 =
5212 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
5213 let tenant2 =
5214 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
5215
5216 backend
5218 .create(
5219 &tenant1,
5220 "Patient",
5221 json!({"id": "shared-id"}),
5222 FhirVersion::default(),
5223 )
5224 .await
5225 .unwrap();
5226
5227 let result = backend
5229 .conditional_create(
5230 &tenant2,
5231 "Patient",
5232 json!({}),
5233 "_id=shared-id",
5234 FhirVersion::default(),
5235 )
5236 .await
5237 .unwrap();
5238
5239 match result {
5240 ConditionalCreateResult::Created(_) => {}
5241 _ => panic!("Expected Created result (tenant isolation)"),
5242 }
5243 }
5244
5245 #[tokio::test]
5250 async fn test_conditional_patch_json_patch() {
5251 use crate::core::PatchFormat;
5252
5253 let backend = create_test_backend();
5254 let tenant = create_test_tenant();
5255
5256 backend
5258 .create(
5259 &tenant,
5260 "Patient",
5261 json!({"id": "p1", "active": false, "name": [{"family": "Smith"}]}),
5262 FhirVersion::default(),
5263 )
5264 .await
5265 .unwrap();
5266
5267 let patch = PatchFormat::JsonPatch(json!([
5269 {"op": "replace", "path": "/active", "value": true}
5270 ]));
5271
5272 let result = backend
5273 .conditional_patch(&tenant, "Patient", "_id=p1", &patch)
5274 .await
5275 .unwrap();
5276
5277 match result {
5278 crate::core::ConditionalPatchResult::Patched(resource) => {
5279 assert_eq!(resource.content()["active"], json!(true));
5280 }
5281 _ => panic!("Expected Patched result"),
5282 }
5283 }
5284
5285 #[tokio::test]
5286 async fn test_conditional_patch_merge_patch() {
5287 use crate::core::PatchFormat;
5288
5289 let backend = create_test_backend();
5290 let tenant = create_test_tenant();
5291
5292 backend
5294 .create(
5295 &tenant,
5296 "Patient",
5297 json!({"id": "p1", "active": false, "gender": "unknown"}),
5298 FhirVersion::default(),
5299 )
5300 .await
5301 .unwrap();
5302
5303 let patch = PatchFormat::MergePatch(json!({
5305 "active": true,
5306 "gender": null }));
5308
5309 let result = backend
5310 .conditional_patch(&tenant, "Patient", "_id=p1", &patch)
5311 .await
5312 .unwrap();
5313
5314 match result {
5315 crate::core::ConditionalPatchResult::Patched(resource) => {
5316 assert_eq!(resource.content()["active"], json!(true));
5317 assert!(resource.content().get("gender").is_none());
5318 }
5319 _ => panic!("Expected Patched result"),
5320 }
5321 }
5322
5323 #[tokio::test]
5324 async fn test_conditional_patch_no_match() {
5325 use crate::core::PatchFormat;
5326
5327 let backend = create_test_backend();
5328 let tenant = create_test_tenant();
5329
5330 let patch = PatchFormat::JsonPatch(json!([
5331 {"op": "replace", "path": "/active", "value": true}
5332 ]));
5333
5334 let result = backend
5335 .conditional_patch(&tenant, "Patient", "_id=nonexistent", &patch)
5336 .await
5337 .unwrap();
5338
5339 match result {
5340 crate::core::ConditionalPatchResult::NoMatch => {}
5341 _ => panic!("Expected NoMatch result"),
5342 }
5343 }
5344
5345 #[tokio::test]
5350 async fn test_batch_create_multiple() {
5351 use crate::core::transaction::BundleProvider;
5352
5353 let backend = create_test_backend();
5354 let tenant = create_test_tenant();
5355
5356 let entries = vec![
5357 BundleEntry {
5358 method: BundleMethod::Post,
5359 url: "Patient".to_string(),
5360 resource: Some(json!({"resourceType": "Patient", "id": "batch-p1"})),
5361 if_match: None,
5362 if_none_match: None,
5363 if_none_exist: None,
5364 full_url: None,
5365 },
5366 BundleEntry {
5367 method: BundleMethod::Post,
5368 url: "Patient".to_string(),
5369 resource: Some(json!({"resourceType": "Patient", "id": "batch-p2"})),
5370 if_match: None,
5371 if_none_match: None,
5372 if_none_exist: None,
5373 full_url: None,
5374 },
5375 ];
5376
5377 let result = backend.process_batch(&tenant, entries).await.unwrap();
5378
5379 assert_eq!(result.entries.len(), 2);
5380 assert_eq!(result.entries[0].status, 201);
5381 assert_eq!(result.entries[1].status, 201);
5382 }
5383
5384 #[tokio::test]
5385 async fn test_batch_mixed_operations() {
5386 use crate::core::transaction::BundleProvider;
5387
5388 let backend = create_test_backend();
5389 let tenant = create_test_tenant();
5390
5391 backend
5393 .create(
5394 &tenant,
5395 "Patient",
5396 json!({"id": "existing"}),
5397 FhirVersion::default(),
5398 )
5399 .await
5400 .unwrap();
5401
5402 let entries = vec![
5403 BundleEntry {
5405 method: BundleMethod::Get,
5406 url: "Patient/existing".to_string(),
5407 resource: None,
5408 if_match: None,
5409 if_none_match: None,
5410 if_none_exist: None,
5411 full_url: None,
5412 },
5413 BundleEntry {
5415 method: BundleMethod::Post,
5416 url: "Patient".to_string(),
5417 resource: Some(json!({"resourceType": "Patient", "id": "new"})),
5418 if_match: None,
5419 if_none_match: None,
5420 if_none_exist: None,
5421 full_url: None,
5422 },
5423 BundleEntry {
5425 method: BundleMethod::Get,
5426 url: "Patient/nonexistent".to_string(),
5427 resource: None,
5428 if_match: None,
5429 if_none_match: None,
5430 if_none_exist: None,
5431 full_url: None,
5432 },
5433 ];
5434
5435 let result = backend.process_batch(&tenant, entries).await.unwrap();
5436
5437 assert_eq!(result.entries.len(), 3);
5438 assert_eq!(result.entries[0].status, 200); assert_eq!(result.entries[1].status, 201); assert_eq!(result.entries[2].status, 404); }
5442
5443 #[tokio::test]
5444 async fn test_batch_delete() {
5445 use crate::core::transaction::BundleProvider;
5446
5447 let backend = create_test_backend();
5448 let tenant = create_test_tenant();
5449
5450 backend
5452 .create(
5453 &tenant,
5454 "Patient",
5455 json!({"id": "to-delete"}),
5456 FhirVersion::default(),
5457 )
5458 .await
5459 .unwrap();
5460
5461 let entries = vec![BundleEntry {
5462 method: BundleMethod::Delete,
5463 url: "Patient/to-delete".to_string(),
5464 resource: None,
5465 if_match: None,
5466 if_none_match: None,
5467 if_none_exist: None,
5468 full_url: None,
5469 }];
5470
5471 let result = backend.process_batch(&tenant, entries).await.unwrap();
5472
5473 assert_eq!(result.entries.len(), 1);
5474 assert_eq!(result.entries[0].status, 204);
5475
5476 let read_result = backend.read(&tenant, "Patient", "to-delete").await;
5478 match read_result {
5479 Ok(None) => {} Err(StorageError::Resource(ResourceError::Gone { .. })) => {} other => panic!("Expected None or Gone, got {:?}", other),
5482 }
5483 }
5484
5485 #[tokio::test]
5486 async fn test_transaction_all_or_nothing() {
5487 use crate::core::transaction::BundleProvider;
5488
5489 let backend = create_test_backend();
5490 let tenant = create_test_tenant();
5491
5492 backend
5494 .create(
5495 &tenant,
5496 "Patient",
5497 json!({"id": "existing"}),
5498 FhirVersion::default(),
5499 )
5500 .await
5501 .unwrap();
5502
5503 let entries = vec![
5504 BundleEntry {
5506 method: BundleMethod::Post,
5507 url: "Patient".to_string(),
5508 resource: Some(json!({"resourceType": "Patient", "id": "tx-p1"})),
5509 if_match: None,
5510 if_none_match: None,
5511 if_none_exist: None,
5512 full_url: None,
5513 },
5514 BundleEntry {
5516 method: BundleMethod::Post,
5517 url: "Patient".to_string(),
5518 resource: Some(json!({"resourceType": "Patient", "id": "existing"})),
5519 if_match: None,
5520 if_none_match: None,
5521 if_none_exist: None,
5522 full_url: None,
5523 },
5524 ];
5525
5526 let result = backend.process_transaction(&tenant, entries).await;
5527
5528 assert!(result.is_err());
5530
5531 let read = backend.read(&tenant, "Patient", "tx-p1").await.unwrap();
5533 assert!(read.is_none());
5534 }
5535
5536 #[tokio::test]
5537 async fn test_transaction_success() {
5538 use crate::core::transaction::BundleProvider;
5539
5540 let backend = create_test_backend();
5541 let tenant = create_test_tenant();
5542
5543 let entries = vec![
5544 BundleEntry {
5545 method: BundleMethod::Post,
5546 url: "Patient".to_string(),
5547 resource: Some(json!({"resourceType": "Patient", "id": "tx-success-1"})),
5548 if_match: None,
5549 if_none_match: None,
5550 if_none_exist: None,
5551 full_url: None,
5552 },
5553 BundleEntry {
5554 method: BundleMethod::Post,
5555 url: "Observation".to_string(),
5556 resource: Some(json!({"resourceType": "Observation", "id": "tx-success-2"})),
5557 if_match: None,
5558 if_none_match: None,
5559 if_none_exist: None,
5560 full_url: None,
5561 },
5562 ];
5563
5564 let result = backend.process_transaction(&tenant, entries).await.unwrap();
5565
5566 assert_eq!(result.entries.len(), 2);
5567 assert_eq!(result.entries[0].status, 201);
5568 assert_eq!(result.entries[1].status, 201);
5569
5570 assert!(
5572 backend
5573 .read(&tenant, "Patient", "tx-success-1")
5574 .await
5575 .unwrap()
5576 .is_some()
5577 );
5578 assert!(
5579 backend
5580 .read(&tenant, "Observation", "tx-success-2")
5581 .await
5582 .unwrap()
5583 .is_some()
5584 );
5585 }
5586
5587 #[tokio::test]
5588 async fn test_parse_url_formats() {
5589 let backend = create_test_backend();
5590
5591 let (rt, id) = backend.parse_url("Patient/123").unwrap();
5593 assert_eq!(rt, "Patient");
5594 assert_eq!(id, "123");
5595
5596 let (rt, id) = backend.parse_url("/Patient/456").unwrap();
5598 assert_eq!(rt, "Patient");
5599 assert_eq!(id, "456");
5600
5601 let (rt, id) = backend
5603 .parse_url("http://example.com/fhir/Patient/789")
5604 .unwrap();
5605 assert_eq!(rt, "Patient");
5606 assert_eq!(id, "789");
5607
5608 let (rt, id) = backend
5610 .parse_url("https://example.com/fhir/Observation/obs-1")
5611 .unwrap();
5612 assert_eq!(rt, "Observation");
5613 assert_eq!(id, "obs-1");
5614 }
5615
5616 #[tokio::test]
5621 async fn test_search_index_display_text_populated() {
5622 let backend = create_test_backend();
5623 let tenant = create_test_tenant();
5624
5625 backend
5627 .create(
5628 &tenant,
5629 "Observation",
5630 json!({
5631 "resourceType": "Observation",
5632 "id": "obs-display-test",
5633 "code": {
5634 "coding": [
5635 {
5636 "system": "http://loinc.org",
5637 "code": "8867-4",
5638 "display": "Heart rate"
5639 }
5640 ]
5641 },
5642 "status": "final"
5643 }),
5644 FhirVersion::default(),
5645 )
5646 .await
5647 .unwrap();
5648
5649 let conn = backend.get_connection().unwrap();
5651 let mut stmt = conn
5652 .prepare(
5653 "SELECT param_name, value_token_system, value_token_code, value_token_display
5654 FROM search_index
5655 WHERE tenant_id = 'test-tenant'
5656 AND resource_id = 'obs-display-test'
5657 AND param_name = 'code'",
5658 )
5659 .unwrap();
5660
5661 #[allow(clippy::type_complexity)]
5662 let rows: Vec<(String, Option<String>, Option<String>, Option<String>)> = stmt
5663 .query_map([], |row| {
5664 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
5665 })
5666 .unwrap()
5667 .filter_map(|r| r.ok())
5668 .collect();
5669
5670 assert!(
5672 !rows.is_empty(),
5673 "Should have indexed 'code' parameter for Observation"
5674 );
5675
5676 let entry = rows
5678 .iter()
5679 .find(|(_, _, code, _)| code.as_deref() == Some("8867-4"));
5680 assert!(entry.is_some(), "Should have entry with code 8867-4");
5681
5682 let (_, _, _, display) = entry.unwrap();
5684 assert_eq!(
5685 display.as_deref(),
5686 Some("Heart rate"),
5687 "Display text should be 'Heart rate'"
5688 );
5689 }
5690
5691 #[tokio::test]
5692 async fn test_search_index_identifier_type_populated() {
5693 let backend = create_test_backend();
5694 let tenant = create_test_tenant();
5695
5696 backend
5698 .create(
5699 &tenant,
5700 "Patient",
5701 json!({
5702 "resourceType": "Patient",
5703 "id": "patient-type-test",
5704 "identifier": [
5705 {
5706 "type": {
5707 "coding": [
5708 {
5709 "system": "http://terminology.hl7.org/CodeSystem/v2-0203",
5710 "code": "MR"
5711 }
5712 ]
5713 },
5714 "system": "http://hospital.org/mrn",
5715 "value": "MRN12345"
5716 }
5717 ]
5718 }),
5719 FhirVersion::default(),
5720 )
5721 .await
5722 .unwrap();
5723
5724 let conn = backend.get_connection().unwrap();
5726 let mut stmt = conn
5727 .prepare(
5728 "SELECT param_name, value_token_code, value_identifier_type_system, value_identifier_type_code
5729 FROM search_index
5730 WHERE tenant_id = 'test-tenant'
5731 AND resource_id = 'patient-type-test'
5732 AND param_name = 'identifier'",
5733 )
5734 .unwrap();
5735
5736 #[allow(clippy::type_complexity)]
5737 let rows: Vec<(String, Option<String>, Option<String>, Option<String>)> = stmt
5738 .query_map([], |row| {
5739 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
5740 })
5741 .unwrap()
5742 .filter_map(|r| r.ok())
5743 .collect();
5744
5745 assert!(
5747 !rows.is_empty(),
5748 "Should have indexed 'identifier' parameter for Patient"
5749 );
5750
5751 let entry = rows
5753 .iter()
5754 .find(|(_, code, _, _)| code.as_deref() == Some("MRN12345"));
5755 assert!(entry.is_some(), "Should have entry with value MRN12345");
5756
5757 let (_, _, type_system, type_code) = entry.unwrap();
5759 assert_eq!(
5760 type_system.as_deref(),
5761 Some("http://terminology.hl7.org/CodeSystem/v2-0203"),
5762 "Identifier type system should be populated"
5763 );
5764 assert_eq!(
5765 type_code.as_deref(),
5766 Some("MR"),
5767 "Identifier type code should be populated"
5768 );
5769 }
5770}