1use async_trait::async_trait;
4use chrono::Utc;
5use helios_fhir::FhirVersion;
6use rusqlite::{ToSql, params};
7use serde_json::Value;
8
9use crate::core::history::{
10 DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
11 InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
12};
13use crate::core::transaction::{
14 BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
15};
16use crate::core::{
17 ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
18 PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
19};
20use crate::error::TransactionError;
21use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
22use crate::search::extractor::ExtractedValue;
23use crate::search::loader::SearchParameterLoader;
24use crate::search::registry::SearchParameterStatus;
25use crate::search::reindex::{ReindexableStorage, ResourcePage};
26use crate::tenant::TenantContext;
27use crate::types::Pagination;
28use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
29use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
30
31use super::SqliteBackend;
32use super::search::writer::SqliteSearchIndexWriter;
33
34fn internal_error(message: String) -> StorageError {
35 StorageError::Backend(BackendError::Internal {
36 backend_name: "sqlite".to_string(),
37 message,
38 source: None,
39 })
40}
41
42fn serialization_error(message: String) -> StorageError {
43 StorageError::Backend(BackendError::SerializationError { message })
44}
45
46#[async_trait]
47impl ResourceStorage for SqliteBackend {
48 fn backend_name(&self) -> &'static str {
49 "sqlite"
50 }
51
52 async fn create(
53 &self,
54 tenant: &TenantContext,
55 resource_type: &str,
56 resource: Value,
57 fhir_version: FhirVersion,
58 ) -> StorageResult<StoredResource> {
59 let conn = self.get_connection()?;
60 let tenant_id = tenant.tenant_id().as_str();
61
62 let id = resource
64 .get("id")
65 .and_then(|v| v.as_str())
66 .map(String::from)
67 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
68
69 let exists: bool = conn
71 .query_row(
72 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
73 params![tenant_id, resource_type, id],
74 |_| Ok(true),
75 )
76 .unwrap_or(false);
77
78 if exists {
79 return Err(StorageError::Resource(ResourceError::AlreadyExists {
80 resource_type: resource_type.to_string(),
81 id: id.clone(),
82 }));
83 }
84
85 let mut resource = resource;
87 if let Some(obj) = resource.as_object_mut() {
88 obj.insert(
89 "resourceType".to_string(),
90 Value::String(resource_type.to_string()),
91 );
92 obj.insert("id".to_string(), Value::String(id.clone()));
93 }
94
95 let data = serde_json::to_vec(&resource)
97 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
98
99 let now = Utc::now();
100 let last_updated = now.to_rfc3339();
101 let version_id = "1";
102 let fhir_version_str = fhir_version.as_mime_param();
103
104 conn.execute(
106 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
107 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
108 params![tenant_id, resource_type, id, version_id, data, last_updated, fhir_version_str],
109 )
110 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
111
112 conn.execute(
114 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
115 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
116 params![tenant_id, resource_type, id, version_id, data, last_updated, fhir_version_str],
117 )
118 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
119
120 self.index_resource(&conn, tenant_id, resource_type, &id, &resource)?;
122
123 if resource_type == "SearchParameter" {
125 self.handle_search_parameter_create(&resource)?;
126 }
127
128 Ok(StoredResource::from_storage(
130 resource_type,
131 &id,
132 version_id,
133 tenant.tenant_id().clone(),
134 resource,
135 now,
136 now,
137 None,
138 fhir_version,
139 ))
140 }
141
142 async fn create_or_update(
143 &self,
144 tenant: &TenantContext,
145 resource_type: &str,
146 id: &str,
147 resource: Value,
148 fhir_version: FhirVersion,
149 ) -> StorageResult<(StoredResource, bool)> {
150 let existing = self.read(tenant, resource_type, id).await?;
152
153 if let Some(current) = existing {
154 let updated = self.update(tenant, ¤t, resource).await?;
156 Ok((updated, false))
157 } else {
158 let mut resource = resource;
160 if let Some(obj) = resource.as_object_mut() {
161 obj.insert("id".to_string(), Value::String(id.to_string()));
162 }
163 let created = self
164 .create(tenant, resource_type, resource, fhir_version)
165 .await?;
166 Ok((created, true))
167 }
168 }
169
170 async fn read(
171 &self,
172 tenant: &TenantContext,
173 resource_type: &str,
174 id: &str,
175 ) -> StorageResult<Option<StoredResource>> {
176 let conn = self.get_connection()?;
177 let tenant_id = tenant.tenant_id().as_str();
178
179 let result = conn.query_row(
180 "SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
181 FROM resources
182 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
183 params![tenant_id, resource_type, id],
184 |row| {
185 let version_id: String = row.get(0)?;
186 let data: Vec<u8> = row.get(1)?;
187 let last_updated: String = row.get(2)?;
188 let is_deleted: i32 = row.get(3)?;
189 let deleted_at: Option<String> = row.get(4)?;
190 let fhir_version: String = row.get(5)?;
191 Ok((
192 version_id,
193 data,
194 last_updated,
195 is_deleted,
196 deleted_at,
197 fhir_version,
198 ))
199 },
200 );
201
202 match result {
203 Ok((version_id, data, last_updated, is_deleted, deleted_at, fhir_version_str)) => {
204 if is_deleted != 0 {
206 let deleted_at = deleted_at.and_then(|s| {
207 chrono::DateTime::parse_from_rfc3339(&s)
208 .ok()
209 .map(|dt| dt.with_timezone(&Utc))
210 });
211 return Err(StorageError::Resource(ResourceError::Gone {
212 resource_type: resource_type.to_string(),
213 id: id.to_string(),
214 deleted_at,
215 }));
216 }
217
218 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
219 serialization_error(format!("Failed to deserialize resource: {}", e))
220 })?;
221
222 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
223 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
224 .with_timezone(&Utc);
225
226 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
228
229 Ok(Some(StoredResource::from_storage(
230 resource_type,
231 id,
232 version_id,
233 tenant.tenant_id().clone(),
234 json_data,
235 last_updated,
236 last_updated,
237 None,
238 fhir_version,
239 )))
240 }
241 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
242 Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
243 }
244 }
245
246 async fn update(
247 &self,
248 tenant: &TenantContext,
249 current: &StoredResource,
250 resource: Value,
251 ) -> StorageResult<StoredResource> {
252 let conn = self.get_connection()?;
253 let tenant_id = tenant.tenant_id().as_str();
254 let resource_type = current.resource_type();
255 let id = current.id();
256
257 let actual_version: Result<String, _> = conn.query_row(
259 "SELECT version_id FROM resources
260 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
261 params![tenant_id, resource_type, id],
262 |row| row.get(0),
263 );
264
265 let actual_version = match actual_version {
266 Ok(v) => v,
267 Err(rusqlite::Error::QueryReturnedNoRows) => {
268 return Err(StorageError::Resource(ResourceError::NotFound {
269 resource_type: resource_type.to_string(),
270 id: id.to_string(),
271 }));
272 }
273 Err(e) => {
274 return Err(internal_error(format!(
275 "Failed to get current version: {}",
276 e
277 )));
278 }
279 };
280
281 if actual_version != current.version_id() {
283 return Err(StorageError::Concurrency(
284 ConcurrencyError::VersionConflict {
285 resource_type: resource_type.to_string(),
286 id: id.to_string(),
287 expected_version: current.version_id().to_string(),
288 actual_version,
289 },
290 ));
291 }
292
293 let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
295 let new_version_str = new_version.to_string();
296
297 let mut resource = resource;
299 if let Some(obj) = resource.as_object_mut() {
300 obj.insert(
301 "resourceType".to_string(),
302 Value::String(resource_type.to_string()),
303 );
304 obj.insert("id".to_string(), Value::String(id.to_string()));
305 }
306
307 let data = serde_json::to_vec(&resource)
309 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
310
311 let now = Utc::now();
312 let last_updated = now.to_rfc3339();
313
314 conn.execute(
316 "UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
317 WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
318 params![
319 new_version_str,
320 data,
321 last_updated,
322 tenant_id,
323 resource_type,
324 id
325 ],
326 )
327 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
328
329 let fhir_version_str = current.fhir_version().as_mime_param();
331 conn.execute(
332 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
333 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
334 params![tenant_id, resource_type, id, new_version_str, data, last_updated, fhir_version_str],
335 )
336 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
337
338 self.delete_search_index(&conn, tenant_id, resource_type, id)?;
340 self.index_resource(&conn, tenant_id, resource_type, id, &resource)?;
341
342 if resource_type == "SearchParameter" {
344 self.handle_search_parameter_update(current.content(), &resource)?;
345 }
346
347 Ok(StoredResource::from_storage(
348 resource_type,
349 id,
350 new_version_str,
351 tenant.tenant_id().clone(),
352 resource,
353 now,
354 now,
355 None,
356 current.fhir_version(),
357 ))
358 }
359
360 async fn delete(
361 &self,
362 tenant: &TenantContext,
363 resource_type: &str,
364 id: &str,
365 ) -> StorageResult<()> {
366 let conn = self.get_connection()?;
367 let tenant_id = tenant.tenant_id().as_str();
368
369 let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
371 "SELECT version_id, data, fhir_version FROM resources
372 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
373 params![tenant_id, resource_type, id],
374 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
375 );
376
377 let (current_version, data, fhir_version_str) = match result {
378 Ok(v) => v,
379 Err(rusqlite::Error::QueryReturnedNoRows) => {
380 return Err(StorageError::Resource(ResourceError::NotFound {
381 resource_type: resource_type.to_string(),
382 id: id.to_string(),
383 }));
384 }
385 Err(e) => {
386 return Err(internal_error(format!("Failed to check resource: {}", e)));
387 }
388 };
389
390 let now = Utc::now();
391 let deleted_at = now.to_rfc3339();
392
393 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
395 let new_version_str = new_version.to_string();
396
397 conn.execute(
399 "UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
400 WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
401 params![deleted_at, new_version_str, tenant_id, resource_type, id],
402 )
403 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
404
405 conn.execute(
407 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
408 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
409 params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
410 )
411 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
412
413 if !self.is_search_offloaded() {
415 conn.execute(
416 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
417 params![tenant_id, resource_type, id],
418 )
419 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
420 }
421
422 if resource_type == "SearchParameter" {
424 if let Ok(resource_json) = serde_json::from_slice::<Value>(&data) {
425 self.handle_search_parameter_delete(&resource_json)?;
426 }
427 }
428
429 Ok(())
430 }
431
432 async fn count(
433 &self,
434 tenant: &TenantContext,
435 resource_type: Option<&str>,
436 ) -> StorageResult<u64> {
437 let conn = self.get_connection()?;
438 let tenant_id = tenant.tenant_id().as_str();
439
440 let count: i64 = if let Some(rt) = resource_type {
441 conn.query_row(
442 "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0",
443 params![tenant_id, rt],
444 |row| row.get(0),
445 )
446 } else {
447 conn.query_row(
448 "SELECT COUNT(*) FROM resources WHERE tenant_id = ?1 AND is_deleted = 0",
449 params![tenant_id],
450 |row| row.get(0),
451 )
452 }
453 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
454
455 Ok(count as u64)
456 }
457}
458
459impl SqliteBackend {
461 pub(crate) fn index_resource(
468 &self,
469 conn: &rusqlite::Connection,
470 tenant_id: &str,
471 resource_type: &str,
472 resource_id: &str,
473 resource: &Value,
474 ) -> StorageResult<()> {
475 if self.is_search_offloaded() {
477 return Ok(());
478 }
479
480 match self.index_resource_dynamic(conn, tenant_id, resource_type, resource_id, resource) {
482 Ok(count) => {
483 tracing::debug!(
484 "Dynamically indexed {} values for {}/{}",
485 count,
486 resource_type,
487 resource_id
488 );
489 }
490 Err(e) => {
491 tracing::warn!(
492 "Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
493 resource_type,
494 resource_id,
495 e
496 );
497 self.index_minimal_fallback(conn, tenant_id, resource_type, resource_id, resource)?;
499 }
500 }
501
502 self.index_fts_content(conn, tenant_id, resource_type, resource_id, resource)?;
504
505 Ok(())
506 }
507
508 fn index_fts_content(
512 &self,
513 conn: &rusqlite::Connection,
514 tenant_id: &str,
515 resource_type: &str,
516 resource_id: &str,
517 resource: &Value,
518 ) -> StorageResult<()> {
519 use super::search::fts::extract_searchable_content;
520
521 let fts_exists: bool = conn
523 .query_row(
524 "SELECT 1 FROM sqlite_master WHERE type='table' AND name='resource_fts'",
525 [],
526 |_| Ok(true),
527 )
528 .unwrap_or(false);
529
530 if !fts_exists {
531 return Ok(());
533 }
534
535 let content = extract_searchable_content(resource);
537
538 if content.is_empty() {
539 return Ok(());
540 }
541
542 conn.execute(
544 "INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
545 VALUES (?1, ?2, ?3, ?4, ?5)",
546 params![
547 resource_id,
548 resource_type,
549 tenant_id,
550 content.narrative,
551 content.full_content
552 ],
553 )
554 .map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
555
556 Ok(())
557 }
558
559 fn index_resource_dynamic(
563 &self,
564 conn: &rusqlite::Connection,
565 tenant_id: &str,
566 resource_type: &str,
567 resource_id: &str,
568 resource: &Value,
569 ) -> StorageResult<usize> {
570 let values = self
572 .search_extractor()
573 .extract(resource, resource_type)
574 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
575
576 let mut count = 0;
577 for value in values {
578 self.write_index_entry(conn, tenant_id, resource_type, resource_id, &value)?;
579 count += 1;
580 }
581
582 Ok(count)
583 }
584
585 fn write_index_entry(
587 &self,
588 conn: &rusqlite::Connection,
589 tenant_id: &str,
590 resource_type: &str,
591 resource_id: &str,
592 value: &ExtractedValue,
593 ) -> StorageResult<()> {
594 use crate::search::converters::IndexValue;
595
596 let normalized_value = match &value.value {
598 IndexValue::Date {
599 value: date_str,
600 precision,
601 } => {
602 let normalized_date = Self::normalize_date_for_sqlite(date_str);
603 let mut normalized = value.clone();
604 normalized.value = IndexValue::Date {
605 value: normalized_date,
606 precision: *precision,
607 };
608 Some(normalized)
609 }
610 _ => None,
611 };
612
613 let value_to_use = normalized_value.as_ref().unwrap_or(value);
614 let sql_params = SqliteSearchIndexWriter::to_sql_params(
615 tenant_id,
616 resource_type,
617 resource_id,
618 value_to_use,
619 );
620
621 let param_refs: Vec<&dyn ToSql> = sql_params
623 .iter()
624 .map(|p| self.sql_value_to_ref(p))
625 .collect();
626
627 conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
628 .map_err(|e| internal_error(format!("Failed to insert search index entry: {}", e)))?;
629
630 Ok(())
631 }
632
633 fn normalize_date_for_sqlite(value: &str) -> String {
637 if value.contains('T') {
638 value.to_string()
639 } else if value.len() == 10 {
640 format!("{}T00:00:00", value)
642 } else if value.len() == 7 {
643 format!("{}-01T00:00:00", value)
645 } else if value.len() == 4 {
646 format!("{}-01-01T00:00:00", value)
648 } else {
649 value.to_string()
650 }
651 }
652
653 fn sql_value_to_ref<'a>(&'a self, value: &'a super::search::writer::SqlValue) -> &'a dyn ToSql {
655 use super::search::writer::SqlValue;
656 match value {
657 SqlValue::String(s) => s,
658 SqlValue::OptString(opt) => opt,
659 SqlValue::Int(i) => i,
660 SqlValue::OptInt(opt) => opt,
661 SqlValue::Float(f) => f,
662 SqlValue::Null => &rusqlite::types::Null,
663 }
664 }
665
666 pub(crate) fn delete_search_index(
668 &self,
669 conn: &rusqlite::Connection,
670 tenant_id: &str,
671 resource_type: &str,
672 resource_id: &str,
673 ) -> StorageResult<()> {
674 if self.is_search_offloaded() {
676 return Ok(());
677 }
678
679 conn.execute(
681 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
682 params![tenant_id, resource_type, resource_id],
683 )
684 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
685
686 let _ = conn.execute(
688 "DELETE FROM resource_fts WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
689 params![tenant_id, resource_type, resource_id],
690 );
691
692 Ok(())
693 }
694
695 fn index_minimal_fallback(
701 &self,
702 conn: &rusqlite::Connection,
703 tenant_id: &str,
704 resource_type: &str,
705 resource_id: &str,
706 resource: &Value,
707 ) -> StorageResult<()> {
708 if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
710 self.insert_token_index(conn, tenant_id, resource_type, resource_id, "_id", None, id)?;
711 }
712
713 if let Some(last_updated) = resource
715 .get("meta")
716 .and_then(|m| m.get("lastUpdated"))
717 .and_then(|v| v.as_str())
718 {
719 self.insert_date_index(
720 conn,
721 tenant_id,
722 resource_type,
723 resource_id,
724 "_lastUpdated",
725 last_updated,
726 )?;
727 }
728
729 Ok(())
730 }
731
732 #[allow(clippy::too_many_arguments)]
734 fn insert_token_index(
735 &self,
736 conn: &rusqlite::Connection,
737 tenant_id: &str,
738 resource_type: &str,
739 resource_id: &str,
740 param_name: &str,
741 system: Option<&str>,
742 code: &str,
743 ) -> StorageResult<()> {
744 conn.execute(
745 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_system, value_token_code)
746 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
747 params![tenant_id, resource_type, resource_id, param_name, system, code],
748 )
749 .map_err(|e| internal_error(format!("Failed to insert token index: {}", e)))?;
750 Ok(())
751 }
752
753 fn insert_date_index(
755 &self,
756 conn: &rusqlite::Connection,
757 tenant_id: &str,
758 resource_type: &str,
759 resource_id: &str,
760 param_name: &str,
761 value: &str,
762 ) -> StorageResult<()> {
763 let normalized = if value.contains('T') {
766 value.to_string()
767 } else if value.len() == 10 {
768 format!("{}T00:00:00", value)
770 } else if value.len() == 7 {
771 format!("{}-01T00:00:00", value)
773 } else if value.len() == 4 {
774 format!("{}-01-01T00:00:00", value)
776 } else {
777 value.to_string()
778 };
779
780 conn.execute(
781 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
782 VALUES (?1, ?2, ?3, ?4, ?5)",
783 params![tenant_id, resource_type, resource_id, param_name, normalized],
784 )
785 .map_err(|e| internal_error(format!("Failed to insert date index: {}", e)))?;
786 Ok(())
787 }
788}
789
790impl SqliteBackend {
792 fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
798 let loader = SearchParameterLoader::new(FhirVersion::R4);
799
800 match loader.parse_resource(resource) {
801 Ok(def) => {
802 if def.status == SearchParameterStatus::Active {
804 let mut registry = self.search_registry().write();
805 if let Err(e) = registry.register(def) {
807 tracing::debug!("SearchParameter registration skipped: {}", e);
808 }
809 }
810 }
811 Err(e) => {
812 tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
814 }
815 }
816
817 Ok(())
818 }
819
820 fn handle_search_parameter_update(
827 &self,
828 old_resource: &Value,
829 new_resource: &Value,
830 ) -> StorageResult<()> {
831 let loader = SearchParameterLoader::new(FhirVersion::R4);
832
833 let old_def = loader.parse_resource(old_resource).ok();
834 let new_def = loader.parse_resource(new_resource).ok();
835
836 match (old_def, new_def) {
837 (Some(old), Some(new)) => {
838 let mut registry = self.search_registry().write();
839
840 if old.url != new.url {
842 let _ = registry.unregister(&old.url);
843 if new.status == SearchParameterStatus::Active {
844 let _ = registry.register(new);
845 }
846 } else if old.status != new.status {
847 if let Err(e) = registry.update_status(&new.url, new.status) {
849 tracing::debug!("SearchParameter status update skipped: {}", e);
850 }
851 } else {
852 let _ = registry.unregister(&old.url);
854 if new.status == SearchParameterStatus::Active {
855 let _ = registry.register(new);
856 }
857 }
858 }
859 (None, Some(new)) => {
860 if new.status == SearchParameterStatus::Active {
862 let mut registry = self.search_registry().write();
863 let _ = registry.register(new);
864 }
865 }
866 (Some(old), None) => {
867 let mut registry = self.search_registry().write();
869 let _ = registry.unregister(&old.url);
870 }
871 (None, None) => {
872 }
874 }
875
876 Ok(())
877 }
878
879 fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
884 if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
885 let mut registry = self.search_registry().write();
886 if let Err(e) = registry.unregister(url) {
887 tracing::debug!("SearchParameter unregistration skipped: {}", e);
888 }
889 }
890
891 Ok(())
892 }
893}
894
895#[async_trait]
896impl VersionedStorage for SqliteBackend {
897 async fn vread(
898 &self,
899 tenant: &TenantContext,
900 resource_type: &str,
901 id: &str,
902 version_id: &str,
903 ) -> StorageResult<Option<StoredResource>> {
904 let conn = self.get_connection()?;
905 let tenant_id = tenant.tenant_id().as_str();
906
907 let result = conn.query_row(
908 "SELECT data, last_updated, is_deleted, fhir_version
909 FROM resource_history
910 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
911 params![tenant_id, resource_type, id, version_id],
912 |row| {
913 let data: Vec<u8> = row.get(0)?;
914 let last_updated: String = row.get(1)?;
915 let is_deleted: i32 = row.get(2)?;
916 let fhir_version: String = row.get(3)?;
917 Ok((data, last_updated, is_deleted, fhir_version))
918 },
919 );
920
921 match result {
922 Ok((data, last_updated, is_deleted, fhir_version_str)) => {
923 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
924 serialization_error(format!("Failed to deserialize resource: {}", e))
925 })?;
926
927 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
928 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
929 .with_timezone(&Utc);
930
931 let deleted_at = if is_deleted != 0 {
933 Some(last_updated)
934 } else {
935 None
936 };
937
938 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
939
940 Ok(Some(StoredResource::from_storage(
941 resource_type,
942 id,
943 version_id,
944 tenant.tenant_id().clone(),
945 json_data,
946 last_updated,
947 last_updated,
948 deleted_at,
949 fhir_version,
950 )))
951 }
952 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
953 Err(e) => Err(internal_error(format!("Failed to read version: {}", e))),
954 }
955 }
956
957 async fn update_with_match(
958 &self,
959 tenant: &TenantContext,
960 resource_type: &str,
961 id: &str,
962 expected_version: &str,
963 resource: Value,
964 ) -> StorageResult<StoredResource> {
965 let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
967 StorageError::Resource(ResourceError::NotFound {
968 resource_type: resource_type.to_string(),
969 id: id.to_string(),
970 })
971 })?;
972
973 if current.version_id() != expected_version {
975 return Err(StorageError::Concurrency(
976 ConcurrencyError::VersionConflict {
977 resource_type: resource_type.to_string(),
978 id: id.to_string(),
979 expected_version: expected_version.to_string(),
980 actual_version: current.version_id().to_string(),
981 },
982 ));
983 }
984
985 self.update(tenant, ¤t, resource).await
987 }
988
989 async fn delete_with_match(
990 &self,
991 tenant: &TenantContext,
992 resource_type: &str,
993 id: &str,
994 expected_version: &str,
995 ) -> StorageResult<()> {
996 let conn = self.get_connection()?;
997 let tenant_id = tenant.tenant_id().as_str();
998
999 let current_version: Result<String, _> = conn.query_row(
1001 "SELECT version_id FROM resources
1002 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
1003 params![tenant_id, resource_type, id],
1004 |row| row.get(0),
1005 );
1006
1007 let current_version = match current_version {
1008 Ok(v) => v,
1009 Err(rusqlite::Error::QueryReturnedNoRows) => {
1010 return Err(StorageError::Resource(ResourceError::NotFound {
1011 resource_type: resource_type.to_string(),
1012 id: id.to_string(),
1013 }));
1014 }
1015 Err(e) => {
1016 return Err(internal_error(format!(
1017 "Failed to get current version: {}",
1018 e
1019 )));
1020 }
1021 };
1022
1023 if current_version != expected_version {
1024 return Err(StorageError::Concurrency(
1025 ConcurrencyError::VersionConflict {
1026 resource_type: resource_type.to_string(),
1027 id: id.to_string(),
1028 expected_version: expected_version.to_string(),
1029 actual_version: current_version,
1030 },
1031 ));
1032 }
1033
1034 self.delete(tenant, resource_type, id).await
1036 }
1037
1038 async fn list_versions(
1039 &self,
1040 tenant: &TenantContext,
1041 resource_type: &str,
1042 id: &str,
1043 ) -> StorageResult<Vec<String>> {
1044 let conn = self.get_connection()?;
1045 let tenant_id = tenant.tenant_id().as_str();
1046
1047 let mut stmt = conn
1048 .prepare(
1049 "SELECT version_id FROM resource_history
1050 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3
1051 ORDER BY CAST(version_id AS INTEGER) ASC",
1052 )
1053 .map_err(|e| internal_error(format!("Failed to prepare query: {}", e)))?;
1054
1055 let versions = stmt
1056 .query_map(params![tenant_id, resource_type, id], |row| row.get(0))
1057 .map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?
1058 .filter_map(|r| r.ok())
1059 .collect();
1060
1061 Ok(versions)
1062 }
1063}
1064
1065#[async_trait]
1066impl InstanceHistoryProvider for SqliteBackend {
1067 async fn history_instance(
1068 &self,
1069 tenant: &TenantContext,
1070 resource_type: &str,
1071 id: &str,
1072 params: &HistoryParams,
1073 ) -> StorageResult<HistoryPage> {
1074 let conn = self.get_connection()?;
1075 let tenant_id = tenant.tenant_id().as_str();
1076
1077 let mut sql = String::from(
1079 "SELECT version_id, data, last_updated, is_deleted, fhir_version
1080 FROM resource_history
1081 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1082 );
1083
1084 if !params.include_deleted {
1086 sql.push_str(" AND is_deleted = 0");
1087 }
1088
1089 if let Some(since) = ¶ms.since {
1091 sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
1092 }
1093
1094 if let Some(before) = ¶ms.before {
1096 sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
1097 }
1098
1099 if let Some(cursor) = params.pagination.cursor_value() {
1101 if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
1103 sql.push_str(&format!(
1105 " AND CAST(version_id AS INTEGER) < {}",
1106 version_str.parse::<i64>().unwrap_or(i64::MAX)
1107 ));
1108 }
1109 }
1110
1111 sql.push_str(" ORDER BY CAST(version_id AS INTEGER) DESC");
1113 sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1)); let mut stmt = conn
1116 .prepare(&sql)
1117 .map_err(|e| internal_error(format!("Failed to prepare history query: {}", e)))?;
1118
1119 let rows = stmt
1120 .query_map(params![tenant_id, resource_type, id], |row| {
1121 let version_id: String = row.get(0)?;
1122 let data: Vec<u8> = row.get(1)?;
1123 let last_updated: String = row.get(2)?;
1124 let is_deleted: i32 = row.get(3)?;
1125 let fhir_version: String = row.get(4)?;
1126 Ok((version_id, data, last_updated, is_deleted, fhir_version))
1127 })
1128 .map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
1129
1130 let mut entries = Vec::new();
1131 let mut last_version: Option<String> = None;
1132
1133 for row in rows {
1134 let (version_id, data, last_updated_str, is_deleted, fhir_version_str) =
1135 row.map_err(|e| internal_error(format!("Failed to read history row: {}", e)))?;
1136
1137 if entries.len() >= params.pagination.count as usize {
1139 break;
1140 }
1141
1142 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1143 serialization_error(format!("Failed to deserialize resource: {}", e))
1144 })?;
1145
1146 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1147 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1148 .with_timezone(&Utc);
1149
1150 let deleted_at = if is_deleted != 0 {
1151 Some(last_updated)
1152 } else {
1153 None
1154 };
1155
1156 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1157
1158 let resource = StoredResource::from_storage(
1159 resource_type,
1160 id,
1161 &version_id,
1162 tenant.tenant_id().clone(),
1163 json_data,
1164 last_updated,
1165 last_updated,
1166 deleted_at,
1167 fhir_version,
1168 );
1169
1170 let method = if is_deleted != 0 {
1172 HistoryMethod::Delete
1173 } else if version_id == "1" {
1174 HistoryMethod::Post
1175 } else {
1176 HistoryMethod::Put
1177 };
1178
1179 last_version = Some(version_id);
1180
1181 entries.push(HistoryEntry {
1182 resource,
1183 method,
1184 timestamp: last_updated,
1185 });
1186 }
1187
1188 let has_more = stmt
1190 .query_map(params![tenant_id, resource_type, id], |_| Ok(()))
1191 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1192 .count()
1193 > params.pagination.count as usize;
1194
1195 let page_info = if let (true, Some(version)) = (has_more, last_version) {
1197 let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
1198 PageInfo::with_next(cursor)
1199 } else {
1200 PageInfo::end()
1201 };
1202
1203 Ok(Page::new(entries, page_info))
1204 }
1205
1206 async fn history_instance_count(
1207 &self,
1208 tenant: &TenantContext,
1209 resource_type: &str,
1210 id: &str,
1211 ) -> StorageResult<u64> {
1212 let conn = self.get_connection()?;
1213 let tenant_id = tenant.tenant_id().as_str();
1214
1215 let count: i64 = conn
1216 .query_row(
1217 "SELECT COUNT(*) FROM resource_history
1218 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1219 params![tenant_id, resource_type, id],
1220 |row| row.get(0),
1221 )
1222 .map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
1223
1224 Ok(count as u64)
1225 }
1226
1227 async fn delete_instance_history(
1238 &self,
1239 tenant: &TenantContext,
1240 resource_type: &str,
1241 id: &str,
1242 ) -> StorageResult<u64> {
1243 let conn = self.get_connection()?;
1244 let tenant_id = tenant.tenant_id().as_str();
1245
1246 let exists: bool = conn
1248 .query_row(
1249 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1250 params![tenant_id, resource_type, id],
1251 |_| Ok(true),
1252 )
1253 .unwrap_or(false);
1254
1255 if !exists {
1256 return Err(StorageError::Resource(ResourceError::NotFound {
1257 resource_type: resource_type.to_string(),
1258 id: id.to_string(),
1259 }));
1260 }
1261
1262 let current_version: String = conn
1264 .query_row(
1265 "SELECT version_id FROM resources
1266 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1267 params![tenant_id, resource_type, id],
1268 |row| row.get(0),
1269 )
1270 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1271
1272 let deleted = conn
1275 .execute(
1276 "DELETE FROM resource_history
1277 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id != ?4",
1278 params![tenant_id, resource_type, id, current_version],
1279 )
1280 .map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
1281
1282 Ok(deleted as u64)
1283 }
1284
1285 async fn delete_version(
1291 &self,
1292 tenant: &TenantContext,
1293 resource_type: &str,
1294 id: &str,
1295 version_id: &str,
1296 ) -> StorageResult<()> {
1297 let conn = self.get_connection()?;
1298 let tenant_id = tenant.tenant_id().as_str();
1299
1300 let current_version: Result<String, _> = conn.query_row(
1302 "SELECT version_id FROM resources
1303 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1304 params![tenant_id, resource_type, id],
1305 |row| row.get(0),
1306 );
1307
1308 let current_version = match current_version {
1309 Ok(v) => v,
1310 Err(rusqlite::Error::QueryReturnedNoRows) => {
1311 return Err(StorageError::Resource(ResourceError::NotFound {
1312 resource_type: resource_type.to_string(),
1313 id: id.to_string(),
1314 }));
1315 }
1316 Err(e) => {
1317 return Err(internal_error(format!(
1318 "Failed to get current version: {}",
1319 e
1320 )));
1321 }
1322 };
1323
1324 if version_id == current_version {
1326 return Err(StorageError::Validation(
1327 crate::error::ValidationError::InvalidResource {
1328 message: format!(
1329 "Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
1330 version_id, resource_type, id
1331 ),
1332 details: vec![],
1333 },
1334 ));
1335 }
1336
1337 let version_exists: bool = conn
1339 .query_row(
1340 "SELECT 1 FROM resource_history
1341 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
1342 params![tenant_id, resource_type, id, version_id],
1343 |_| Ok(true),
1344 )
1345 .unwrap_or(false);
1346
1347 if !version_exists {
1348 return Err(StorageError::Resource(ResourceError::VersionNotFound {
1349 resource_type: resource_type.to_string(),
1350 id: id.to_string(),
1351 version_id: version_id.to_string(),
1352 }));
1353 }
1354
1355 conn.execute(
1357 "DELETE FROM resource_history
1358 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND version_id = ?4",
1359 params![tenant_id, resource_type, id, version_id],
1360 )
1361 .map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
1362
1363 Ok(())
1364 }
1365}
1366
1367#[async_trait]
1368impl TypeHistoryProvider for SqliteBackend {
1369 async fn history_type(
1370 &self,
1371 tenant: &TenantContext,
1372 resource_type: &str,
1373 params: &HistoryParams,
1374 ) -> StorageResult<HistoryPage> {
1375 let conn = self.get_connection()?;
1376 let tenant_id = tenant.tenant_id().as_str();
1377
1378 let mut sql = String::from(
1380 "SELECT id, version_id, data, last_updated, is_deleted, fhir_version
1381 FROM resource_history
1382 WHERE tenant_id = ?1 AND resource_type = ?2",
1383 );
1384
1385 if !params.include_deleted {
1387 sql.push_str(" AND is_deleted = 0");
1388 }
1389
1390 if let Some(since) = ¶ms.since {
1392 sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
1393 }
1394
1395 if let Some(before) = ¶ms.before {
1397 sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
1398 }
1399
1400 if let Some(cursor) = params.pagination.cursor_value() {
1403 let sort_values = cursor.sort_values();
1404 if sort_values.len() >= 2 {
1405 if let (
1406 Some(CursorValue::String(timestamp)),
1407 Some(CursorValue::String(resource_id)),
1408 ) = (sort_values.first(), sort_values.get(1))
1409 {
1410 sql.push_str(&format!(
1412 " AND (last_updated < '{}' OR (last_updated = '{}' AND id < '{}'))",
1413 timestamp, timestamp, resource_id
1414 ));
1415 }
1416 }
1417 }
1418
1419 sql.push_str(" ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC");
1421 sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1)); let mut stmt = conn
1424 .prepare(&sql)
1425 .map_err(|e| internal_error(format!("Failed to prepare type history query: {}", e)))?;
1426
1427 let rows = stmt
1428 .query_map(params![tenant_id, resource_type], |row| {
1429 let id: String = row.get(0)?;
1430 let version_id: String = row.get(1)?;
1431 let data: Vec<u8> = row.get(2)?;
1432 let last_updated: String = row.get(3)?;
1433 let is_deleted: i32 = row.get(4)?;
1434 let fhir_version: String = row.get(5)?;
1435 Ok((id, version_id, data, last_updated, is_deleted, fhir_version))
1436 })
1437 .map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
1438
1439 let mut entries = Vec::new();
1440 let mut last_entry: Option<(String, String)> = None; for row in rows {
1443 let (id, version_id, data, last_updated_str, is_deleted, fhir_version_str) =
1444 row.map_err(|e| internal_error(format!("Failed to read type history row: {}", e)))?;
1445
1446 if entries.len() >= params.pagination.count as usize {
1448 break;
1449 }
1450
1451 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1452 serialization_error(format!("Failed to deserialize resource: {}", e))
1453 })?;
1454
1455 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1456 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1457 .with_timezone(&Utc);
1458
1459 let deleted_at = if is_deleted != 0 {
1460 Some(last_updated)
1461 } else {
1462 None
1463 };
1464
1465 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1466
1467 let resource = StoredResource::from_storage(
1468 resource_type,
1469 &id,
1470 &version_id,
1471 tenant.tenant_id().clone(),
1472 json_data,
1473 last_updated,
1474 last_updated,
1475 deleted_at,
1476 fhir_version,
1477 );
1478
1479 let method = if is_deleted != 0 {
1481 HistoryMethod::Delete
1482 } else if version_id == "1" {
1483 HistoryMethod::Post
1484 } else {
1485 HistoryMethod::Put
1486 };
1487
1488 last_entry = Some((last_updated_str.clone(), id));
1489
1490 entries.push(HistoryEntry {
1491 resource,
1492 method,
1493 timestamp: last_updated,
1494 });
1495 }
1496
1497 let _total_fetched = entries.len();
1499 let has_more = {
1500 let check_sql = sql.replace(
1502 &format!(" LIMIT {}", params.pagination.count + 1),
1503 &format!(" LIMIT {}", params.pagination.count + 2),
1504 );
1505 let mut check_stmt = conn
1506 .prepare(&check_sql)
1507 .map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
1508 let check_count = check_stmt
1509 .query_map(params![tenant_id, resource_type], |_| Ok(()))
1510 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1511 .count();
1512 check_count > params.pagination.count as usize
1513 };
1514
1515 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1517 let cursor = PageCursor::new(
1518 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1519 resource_type.to_string(),
1520 );
1521 PageInfo::with_next(cursor)
1522 } else {
1523 PageInfo::end()
1524 };
1525
1526 Ok(Page::new(entries, page_info))
1527 }
1528
1529 async fn history_type_count(
1530 &self,
1531 tenant: &TenantContext,
1532 resource_type: &str,
1533 ) -> StorageResult<u64> {
1534 let conn = self.get_connection()?;
1535 let tenant_id = tenant.tenant_id().as_str();
1536
1537 let count: i64 = conn
1538 .query_row(
1539 "SELECT COUNT(*) FROM resource_history
1540 WHERE tenant_id = ?1 AND resource_type = ?2",
1541 params![tenant_id, resource_type],
1542 |row| row.get(0),
1543 )
1544 .map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
1545
1546 Ok(count as u64)
1547 }
1548}
1549
1550#[async_trait]
1551impl SystemHistoryProvider for SqliteBackend {
1552 async fn history_system(
1553 &self,
1554 tenant: &TenantContext,
1555 params: &HistoryParams,
1556 ) -> StorageResult<HistoryPage> {
1557 let conn = self.get_connection()?;
1558 let tenant_id = tenant.tenant_id().as_str();
1559
1560 let mut sql = String::from(
1562 "SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
1563 FROM resource_history
1564 WHERE tenant_id = ?1",
1565 );
1566
1567 if !params.include_deleted {
1569 sql.push_str(" AND is_deleted = 0");
1570 }
1571
1572 if let Some(since) = ¶ms.since {
1574 sql.push_str(&format!(" AND last_updated >= '{}'", since.to_rfc3339()));
1575 }
1576
1577 if let Some(before) = ¶ms.before {
1579 sql.push_str(&format!(" AND last_updated < '{}'", before.to_rfc3339()));
1580 }
1581
1582 if let Some(cursor) = params.pagination.cursor_value() {
1585 let sort_values = cursor.sort_values();
1586 if sort_values.len() >= 3 {
1587 if let (
1588 Some(CursorValue::String(timestamp)),
1589 Some(CursorValue::String(res_type)),
1590 Some(CursorValue::String(res_id)),
1591 ) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
1592 {
1593 sql.push_str(&format!(
1595 " AND (last_updated < '{}' OR (last_updated = '{}' AND (resource_type < '{}' OR (resource_type = '{}' AND id < '{}'))))",
1596 timestamp, timestamp, res_type, res_type, res_id
1597 ));
1598 }
1599 }
1600 }
1601
1602 sql.push_str(" ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC");
1604 sql.push_str(&format!(" LIMIT {}", params.pagination.count + 1)); let mut stmt = conn.prepare(&sql).map_err(|e| {
1607 internal_error(format!("Failed to prepare system history query: {}", e))
1608 })?;
1609
1610 let rows = stmt
1611 .query_map(params![tenant_id], |row| {
1612 let resource_type: String = row.get(0)?;
1613 let id: String = row.get(1)?;
1614 let version_id: String = row.get(2)?;
1615 let data: Vec<u8> = row.get(3)?;
1616 let last_updated: String = row.get(4)?;
1617 let is_deleted: i32 = row.get(5)?;
1618 let fhir_version: String = row.get(6)?;
1619 Ok((
1620 resource_type,
1621 id,
1622 version_id,
1623 data,
1624 last_updated,
1625 is_deleted,
1626 fhir_version,
1627 ))
1628 })
1629 .map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
1630
1631 let mut entries = Vec::new();
1632 let mut last_entry: Option<(String, String, String)> = None; for row in rows {
1635 let (
1636 resource_type,
1637 id,
1638 version_id,
1639 data,
1640 last_updated_str,
1641 is_deleted,
1642 fhir_version_str,
1643 ) = row
1644 .map_err(|e| internal_error(format!("Failed to read system history row: {}", e)))?;
1645
1646 if entries.len() >= params.pagination.count as usize {
1648 break;
1649 }
1650
1651 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1652 serialization_error(format!("Failed to deserialize resource: {}", e))
1653 })?;
1654
1655 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1656 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1657 .with_timezone(&Utc);
1658
1659 let deleted_at = if is_deleted != 0 {
1660 Some(last_updated)
1661 } else {
1662 None
1663 };
1664
1665 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1666
1667 let resource = StoredResource::from_storage(
1668 &resource_type,
1669 &id,
1670 &version_id,
1671 tenant.tenant_id().clone(),
1672 json_data,
1673 last_updated,
1674 last_updated,
1675 deleted_at,
1676 fhir_version,
1677 );
1678
1679 let method = if is_deleted != 0 {
1681 HistoryMethod::Delete
1682 } else if version_id == "1" {
1683 HistoryMethod::Post
1684 } else {
1685 HistoryMethod::Put
1686 };
1687
1688 last_entry = Some((last_updated_str.clone(), resource_type, id));
1689
1690 entries.push(HistoryEntry {
1691 resource,
1692 method,
1693 timestamp: last_updated,
1694 });
1695 }
1696
1697 let has_more = {
1699 let check_sql = sql.replace(
1700 &format!(" LIMIT {}", params.pagination.count + 1),
1701 &format!(" LIMIT {}", params.pagination.count + 2),
1702 );
1703 let mut check_stmt = conn
1704 .prepare(&check_sql)
1705 .map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
1706 let check_count = check_stmt
1707 .query_map(params![tenant_id], |_| Ok(()))
1708 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1709 .count();
1710 check_count > params.pagination.count as usize
1711 };
1712
1713 let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
1715 {
1716 let cursor = PageCursor::new(
1717 vec![
1718 CursorValue::String(timestamp),
1719 CursorValue::String(resource_type),
1720 CursorValue::String(id),
1721 ],
1722 "system".to_string(),
1723 );
1724 PageInfo::with_next(cursor)
1725 } else {
1726 PageInfo::end()
1727 };
1728
1729 Ok(Page::new(entries, page_info))
1730 }
1731
1732 async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1733 let conn = self.get_connection()?;
1734 let tenant_id = tenant.tenant_id().as_str();
1735
1736 let count: i64 = conn
1737 .query_row(
1738 "SELECT COUNT(*) FROM resource_history WHERE tenant_id = ?1",
1739 params![tenant_id],
1740 |row| row.get(0),
1741 )
1742 .map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
1743
1744 Ok(count as u64)
1745 }
1746}
1747
1748#[async_trait]
1749impl PurgableStorage for SqliteBackend {
1750 async fn purge(
1751 &self,
1752 tenant: &TenantContext,
1753 resource_type: &str,
1754 id: &str,
1755 ) -> StorageResult<()> {
1756 let conn = self.get_connection()?;
1757 let tenant_id = tenant.tenant_id().as_str();
1758
1759 let exists: bool = conn
1761 .query_row(
1762 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1763 params![tenant_id, resource_type, id],
1764 |_| Ok(true),
1765 )
1766 .unwrap_or(false);
1767
1768 if !exists {
1769 let history_exists: bool = conn
1771 .query_row(
1772 "SELECT 1 FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1773 params![tenant_id, resource_type, id],
1774 |_| Ok(true),
1775 )
1776 .unwrap_or(false);
1777
1778 if !history_exists {
1779 return Err(StorageError::Resource(ResourceError::NotFound {
1780 resource_type: resource_type.to_string(),
1781 id: id.to_string(),
1782 }));
1783 }
1784 }
1785
1786 conn.execute(
1788 "DELETE FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1789 params![tenant_id, resource_type, id],
1790 )
1791 .map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
1792
1793 conn.execute(
1795 "DELETE FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
1796 params![tenant_id, resource_type, id],
1797 )
1798 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1799
1800 conn.execute(
1802 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
1803 params![tenant_id, resource_type, id],
1804 )
1805 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1806
1807 Ok(())
1808 }
1809
1810 async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
1811 let conn = self.get_connection()?;
1812 let tenant_id = tenant.tenant_id().as_str();
1813
1814 let count: i64 = conn
1816 .query_row(
1817 "SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = ?1 AND resource_type = ?2",
1818 params![tenant_id, resource_type],
1819 |row| row.get(0),
1820 )
1821 .unwrap_or(0);
1822
1823 conn.execute(
1825 "DELETE FROM resources WHERE tenant_id = ?1 AND resource_type = ?2",
1826 params![tenant_id, resource_type],
1827 )
1828 .map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
1829
1830 conn.execute(
1832 "DELETE FROM resource_history WHERE tenant_id = ?1 AND resource_type = ?2",
1833 params![tenant_id, resource_type],
1834 )
1835 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1836
1837 conn.execute(
1839 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2",
1840 params![tenant_id, resource_type],
1841 )
1842 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1843
1844 Ok(count as u64)
1845 }
1846}
1847
1848#[async_trait]
1849impl DifferentialHistoryProvider for SqliteBackend {
1850 async fn modified_since(
1851 &self,
1852 tenant: &TenantContext,
1853 resource_type: Option<&str>,
1854 since: chrono::DateTime<Utc>,
1855 pagination: &Pagination,
1856 ) -> StorageResult<Page<StoredResource>> {
1857 let conn = self.get_connection()?;
1858 let tenant_id = tenant.tenant_id().as_str();
1859 let since_str = since.to_rfc3339();
1860
1861 let mut sql = String::from(
1863 "SELECT resource_type, id, version_id, data, last_updated, fhir_version
1864 FROM resources
1865 WHERE tenant_id = ?1 AND last_updated > ?2 AND is_deleted = 0",
1866 );
1867
1868 if let Some(rt) = resource_type {
1870 sql.push_str(&format!(" AND resource_type = '{}'", rt));
1871 }
1872
1873 if let Some(cursor) = pagination.cursor_value() {
1875 let sort_values = cursor.sort_values();
1876 if sort_values.len() >= 2 {
1877 if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
1878 (sort_values.first(), sort_values.get(1))
1879 {
1880 sql.push_str(&format!(
1881 " AND (last_updated > '{}' OR (last_updated = '{}' AND id > '{}'))",
1882 timestamp, timestamp, res_id
1883 ));
1884 }
1885 }
1886 }
1887
1888 sql.push_str(" ORDER BY last_updated ASC, id ASC");
1890 sql.push_str(&format!(" LIMIT {}", pagination.count + 1));
1891
1892 let mut stmt = conn.prepare(&sql).map_err(|e| {
1893 internal_error(format!("Failed to prepare modified_since query: {}", e))
1894 })?;
1895
1896 let rows = stmt
1897 .query_map(params![tenant_id, since_str], |row| {
1898 let resource_type: String = row.get(0)?;
1899 let id: String = row.get(1)?;
1900 let version_id: String = row.get(2)?;
1901 let data: Vec<u8> = row.get(3)?;
1902 let last_updated: String = row.get(4)?;
1903 let fhir_version: String = row.get(5)?;
1904 Ok((
1905 resource_type,
1906 id,
1907 version_id,
1908 data,
1909 last_updated,
1910 fhir_version,
1911 ))
1912 })
1913 .map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
1914
1915 let mut resources = Vec::new();
1916 let mut last_entry: Option<(String, String)> = None; for row in rows {
1919 let (resource_type, id, version_id, data, last_updated_str, fhir_version_str) =
1920 row.map_err(|e| internal_error(format!("Failed to read row: {}", e)))?;
1921
1922 if resources.len() >= pagination.count as usize {
1924 break;
1925 }
1926
1927 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
1928 serialization_error(format!("Failed to deserialize resource: {}", e))
1929 })?;
1930
1931 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated_str)
1932 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
1933 .with_timezone(&Utc);
1934
1935 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1936
1937 let resource = StoredResource::from_storage(
1938 &resource_type,
1939 &id,
1940 &version_id,
1941 tenant.tenant_id().clone(),
1942 json_data,
1943 last_updated,
1944 last_updated,
1945 None,
1946 fhir_version,
1947 );
1948
1949 last_entry = Some((last_updated_str, id));
1950 resources.push(resource);
1951 }
1952
1953 let has_more = {
1955 let check_sql = sql.replace(
1956 &format!(" LIMIT {}", pagination.count + 1),
1957 &format!(" LIMIT {}", pagination.count + 2),
1958 );
1959 let mut check_stmt = conn
1960 .prepare(&check_sql)
1961 .map_err(|e| internal_error(format!("Failed to prepare check query: {}", e)))?;
1962 let check_count = check_stmt
1963 .query_map(params![tenant_id, since_str], |_| Ok(()))
1964 .map_err(|e| internal_error(format!("Failed to check for more results: {}", e)))?
1965 .count();
1966 check_count > pagination.count as usize
1967 };
1968
1969 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1971 let cursor = PageCursor::new(
1972 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1973 "modified_since".to_string(),
1974 );
1975 PageInfo::with_next(cursor)
1976 } else {
1977 PageInfo::end()
1978 };
1979
1980 Ok(Page::new(resources, page_info))
1981 }
1982}
1983
1984fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
1987 params
1988 .split('&')
1989 .filter_map(|pair| {
1990 let parts: Vec<&str> = pair.splitn(2, '=').collect();
1991 if parts.len() == 2 {
1992 Some((parts[0].to_string(), parts[1].to_string()))
1993 } else {
1994 None
1995 }
1996 })
1997 .collect()
1998}
1999
2000#[async_trait]
2001impl ConditionalStorage for SqliteBackend {
2002 async fn conditional_create(
2003 &self,
2004 tenant: &TenantContext,
2005 resource_type: &str,
2006 resource: Value,
2007 search_params: &str,
2008 fhir_version: FhirVersion,
2009 ) -> StorageResult<ConditionalCreateResult> {
2010 let matches = self
2012 .find_matching_resources(tenant, resource_type, search_params)
2013 .await?;
2014
2015 match matches.len() {
2016 0 => {
2017 let created = self
2019 .create(tenant, resource_type, resource, fhir_version)
2020 .await?;
2021 Ok(ConditionalCreateResult::Created(created))
2022 }
2023 1 => {
2024 Ok(ConditionalCreateResult::Exists(
2026 matches.into_iter().next().unwrap(),
2027 ))
2028 }
2029 n => {
2030 Ok(ConditionalCreateResult::MultipleMatches(n))
2032 }
2033 }
2034 }
2035
2036 async fn conditional_update(
2037 &self,
2038 tenant: &TenantContext,
2039 resource_type: &str,
2040 resource: Value,
2041 search_params: &str,
2042 upsert: bool,
2043 fhir_version: FhirVersion,
2044 ) -> StorageResult<ConditionalUpdateResult> {
2045 let matches = self
2047 .find_matching_resources(tenant, resource_type, search_params)
2048 .await?;
2049
2050 match matches.len() {
2051 0 => {
2052 if upsert {
2053 let created = self
2055 .create(tenant, resource_type, resource, fhir_version)
2056 .await?;
2057 Ok(ConditionalUpdateResult::Created(created))
2058 } else {
2059 Ok(ConditionalUpdateResult::NoMatch)
2061 }
2062 }
2063 1 => {
2064 let existing = matches.into_iter().next().unwrap();
2066 let updated = self.update(tenant, &existing, resource).await?;
2067 Ok(ConditionalUpdateResult::Updated(updated))
2068 }
2069 n => {
2070 Ok(ConditionalUpdateResult::MultipleMatches(n))
2072 }
2073 }
2074 }
2075
2076 async fn conditional_delete(
2077 &self,
2078 tenant: &TenantContext,
2079 resource_type: &str,
2080 search_params: &str,
2081 ) -> StorageResult<ConditionalDeleteResult> {
2082 let matches = self
2084 .find_matching_resources(tenant, resource_type, search_params)
2085 .await?;
2086
2087 match matches.len() {
2088 0 => {
2089 Ok(ConditionalDeleteResult::NoMatch)
2091 }
2092 1 => {
2093 let existing = matches.into_iter().next().unwrap();
2095 self.delete(tenant, resource_type, existing.id()).await?;
2096 Ok(ConditionalDeleteResult::Deleted)
2097 }
2098 n => {
2099 Ok(ConditionalDeleteResult::MultipleMatches(n))
2101 }
2102 }
2103 }
2104
2105 async fn conditional_patch(
2115 &self,
2116 tenant: &TenantContext,
2117 resource_type: &str,
2118 search_params: &str,
2119 patch: &crate::core::PatchFormat,
2120 ) -> StorageResult<crate::core::ConditionalPatchResult> {
2121 use crate::core::{ConditionalPatchResult, PatchFormat};
2122
2123 let matches = self
2125 .find_matching_resources(tenant, resource_type, search_params)
2126 .await?;
2127
2128 match matches.len() {
2129 0 => Ok(ConditionalPatchResult::NoMatch),
2130 1 => {
2131 let existing = matches.into_iter().next().unwrap();
2133 let current_content = existing.content().clone();
2134
2135 let patched_content = match patch {
2137 PatchFormat::JsonPatch(patch_doc) => {
2138 self.apply_json_patch(¤t_content, patch_doc)?
2139 }
2140 PatchFormat::FhirPathPatch(patch_params) => {
2141 self.apply_fhirpath_patch(¤t_content, patch_params)?
2142 }
2143 PatchFormat::MergePatch(merge_doc) => {
2144 self.apply_merge_patch(¤t_content, merge_doc)
2145 }
2146 };
2147
2148 let updated = self.update(tenant, &existing, patched_content).await?;
2150 Ok(ConditionalPatchResult::Patched(updated))
2151 }
2152 n => Ok(ConditionalPatchResult::MultipleMatches(n)),
2153 }
2154 }
2155}
2156
2157impl SqliteBackend {
2158 async fn find_matching_resources(
2163 &self,
2164 tenant: &TenantContext,
2165 resource_type: &str,
2166 search_params_str: &str,
2167 ) -> StorageResult<Vec<StoredResource>> {
2168 let parsed_params = parse_simple_search_params(search_params_str);
2170
2171 if parsed_params.is_empty() {
2172 return Ok(Vec::new());
2175 }
2176
2177 let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
2179
2180 let query = SearchQuery {
2182 resource_type: resource_type.to_string(),
2183 parameters: search_params,
2184 count: Some(1000), ..Default::default()
2187 };
2188
2189 let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
2191
2192 Ok(result.resources.items)
2193 }
2194
2195 fn build_search_parameters(
2200 &self,
2201 resource_type: &str,
2202 params: &[(String, String)],
2203 ) -> StorageResult<Vec<SearchParameter>> {
2204 let registry = self.search_registry().read();
2205 let mut search_params = Vec::with_capacity(params.len());
2206
2207 for (name, value) in params {
2208 let param_type = self
2210 .lookup_param_type(®istry, resource_type, name)
2211 .unwrap_or({
2212 match name.as_str() {
2214 "_id" => SearchParamType::Token,
2215 "_lastUpdated" => SearchParamType::Date,
2216 "_tag" | "_profile" | "_security" => SearchParamType::Token,
2217 "identifier" => SearchParamType::Token,
2218 "patient" | "subject" | "encounter" | "performer" | "author"
2220 | "requester" | "recorder" | "asserter" | "practitioner"
2221 | "organization" | "location" | "device" => SearchParamType::Reference,
2222 _ => SearchParamType::String, }
2224 });
2225
2226 search_params.push(SearchParameter {
2227 name: name.clone(),
2228 param_type,
2229 modifier: None,
2230 values: vec![SearchValue::parse(value)],
2231 chain: vec![],
2232 components: vec![],
2233 });
2234 }
2235
2236 Ok(search_params)
2237 }
2238
2239 fn lookup_param_type(
2243 &self,
2244 registry: &crate::search::SearchParameterRegistry,
2245 resource_type: &str,
2246 param_name: &str,
2247 ) -> Option<SearchParamType> {
2248 if let Some(def) = registry.get_param(resource_type, param_name) {
2250 return Some(def.param_type);
2251 }
2252
2253 if let Some(def) = registry.get_param("Resource", param_name) {
2255 return Some(def.param_type);
2256 }
2257
2258 None
2259 }
2260
2261 fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
2275 use crate::error::ValidationError;
2276
2277 let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
2279 StorageError::Validation(ValidationError::InvalidResource {
2280 message: format!("Invalid JSON Patch document: {}", e),
2281 details: vec![],
2282 })
2283 })?;
2284
2285 let mut patched = resource.clone();
2287 json_patch::patch(&mut patched, &patch).map_err(|e| {
2288 StorageError::Validation(ValidationError::InvalidResource {
2289 message: format!("Failed to apply JSON Patch: {}", e),
2290 details: vec![],
2291 })
2292 })?;
2293
2294 Ok(patched)
2295 }
2296
2297 fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
2308 use crate::error::ValidationError;
2309
2310 let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
2312 if parameter.is_none() {
2313 return Err(StorageError::Validation(ValidationError::InvalidResource {
2314 message: "FHIRPath Patch must have a 'parameter' array".to_string(),
2315 details: vec![],
2316 }));
2317 }
2318
2319 let mut patched = resource.clone();
2320
2321 for operation in parameter.unwrap() {
2322 let parts = operation.get("part").and_then(|p| p.as_array());
2324 if parts.is_none() {
2325 continue;
2326 }
2327
2328 let mut op_type = None;
2329 let mut op_path = None;
2330 let mut op_name = None;
2331 let mut op_value = None;
2332
2333 for part in parts.unwrap() {
2334 match part.get("name").and_then(|n| n.as_str()) {
2335 Some("type") => {
2336 op_type = part
2337 .get("valueCode")
2338 .and_then(|v| v.as_str())
2339 .map(|s| s.to_string());
2340 }
2341 Some("path") => {
2342 op_path = part
2343 .get("valueString")
2344 .and_then(|v| v.as_str())
2345 .map(|s| s.to_string());
2346 }
2347 Some("name") => {
2348 op_name = part
2349 .get("valueString")
2350 .and_then(|v| v.as_str())
2351 .map(|s| s.to_string());
2352 }
2353 Some("value") => {
2354 op_value = part
2356 .get("valueString")
2357 .or_else(|| part.get("valueBoolean"))
2358 .or_else(|| part.get("valueInteger"))
2359 .or_else(|| part.get("valueDecimal"))
2360 .or_else(|| part.get("valueCode"))
2361 .cloned();
2362 }
2363 _ => {}
2364 }
2365 }
2366
2367 match op_type.as_deref() {
2369 Some("replace") => {
2370 if let (Some(path), Some(value)) = (&op_path, &op_value) {
2371 self.fhirpath_replace(&mut patched, path, value)?;
2372 }
2373 }
2374 Some("add") => {
2375 if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
2376 self.fhirpath_add(&mut patched, path, name, value)?;
2377 }
2378 }
2379 Some("delete") => {
2380 if let Some(path) = &op_path {
2381 self.fhirpath_delete(&mut patched, path)?;
2382 }
2383 }
2384 _ => {
2385 }
2387 }
2388 }
2389
2390 Ok(patched)
2391 }
2392
2393 fn fhirpath_replace(
2395 &self,
2396 resource: &mut Value,
2397 path: &str,
2398 value: &Value,
2399 ) -> StorageResult<()> {
2400 let parts: Vec<&str> = path.split('.').collect();
2403 if parts.len() == 2 {
2404 if let Some(obj) = resource.as_object_mut() {
2406 obj.insert(parts[1].to_string(), value.clone());
2407 }
2408 }
2409 Ok(())
2410 }
2411
2412 fn fhirpath_add(
2414 &self,
2415 resource: &mut Value,
2416 path: &str,
2417 name: &str,
2418 value: &Value,
2419 ) -> StorageResult<()> {
2420 let parts: Vec<&str> = path.split('.').collect();
2422 if parts.len() == 1
2423 && parts[0]
2424 == resource
2425 .get("resourceType")
2426 .and_then(|r| r.as_str())
2427 .unwrap_or("")
2428 {
2429 if let Some(obj) = resource.as_object_mut() {
2431 obj.insert(name.to_string(), value.clone());
2432 }
2433 }
2434 Ok(())
2435 }
2436
2437 fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
2439 let parts: Vec<&str> = path.split('.').collect();
2441 if parts.len() == 2 {
2442 if let Some(obj) = resource.as_object_mut() {
2443 obj.remove(parts[1]);
2444 }
2445 }
2446 Ok(())
2447 }
2448
2449 fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
2456 let mut patched = resource.clone();
2457 json_patch::merge(&mut patched, merge_doc);
2458 patched
2459 }
2460}
2461
2462#[async_trait]
2463impl BundleProvider for SqliteBackend {
2464 async fn process_transaction(
2465 &self,
2466 tenant: &TenantContext,
2467 entries: Vec<BundleEntry>,
2468 ) -> Result<BundleResult, TransactionError> {
2469 use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
2470 use std::collections::HashMap;
2471
2472 let mut tx = self
2474 .begin_transaction(tenant, TransactionOptions::new())
2475 .await
2476 .map_err(|e| TransactionError::RolledBack {
2477 reason: format!("Failed to begin transaction: {}", e),
2478 })?;
2479
2480 let mut results = Vec::with_capacity(entries.len());
2481 let mut error_info: Option<(usize, String)> = None;
2482
2483 let mut reference_map: HashMap<String, String> = HashMap::new();
2486
2487 let mut entries = entries;
2489
2490 for (idx, entry) in entries.iter_mut().enumerate() {
2492 if let Some(ref mut resource) = entry.resource {
2494 resolve_bundle_references(resource, &reference_map);
2495 }
2496
2497 let result = self.process_bundle_entry_tx(&mut tx, entry).await;
2498
2499 match result {
2500 Ok(entry_result) => {
2501 if entry_result.status >= 400 {
2503 error_info = Some((
2504 idx,
2505 format!("Entry failed with status {}", entry_result.status),
2506 ));
2507 break;
2508 }
2509
2510 if entry.method == BundleMethod::Post {
2512 if let Some(ref full_url) = entry.full_url {
2513 if let Some(ref location) = entry_result.location {
2514 let reference = location
2517 .split("/_history")
2518 .next()
2519 .unwrap_or(location)
2520 .to_string();
2521 reference_map.insert(full_url.clone(), reference);
2522 }
2523 }
2524 }
2525
2526 results.push(entry_result);
2527 }
2528 Err(e) => {
2529 error_info = Some((idx, format!("Entry processing failed: {}", e)));
2530 break;
2531 }
2532 }
2533 }
2534
2535 if let Some((index, message)) = error_info {
2537 let _ = Box::new(tx).rollback().await;
2538 return Err(TransactionError::BundleError { index, message });
2539 }
2540
2541 Box::new(tx)
2543 .commit()
2544 .await
2545 .map_err(|e| TransactionError::RolledBack {
2546 reason: format!("Commit failed: {}", e),
2547 })?;
2548
2549 Ok(BundleResult {
2550 bundle_type: BundleType::Transaction,
2551 entries: results,
2552 })
2553 }
2554
2555 async fn process_batch(
2556 &self,
2557 tenant: &TenantContext,
2558 entries: Vec<BundleEntry>,
2559 ) -> StorageResult<BundleResult> {
2560 let mut results = Vec::with_capacity(entries.len());
2561
2562 for entry in &entries {
2564 let result = self.process_batch_entry(tenant, entry).await;
2565 results.push(result);
2566 }
2567
2568 Ok(BundleResult {
2569 bundle_type: BundleType::Batch,
2570 entries: results,
2571 })
2572 }
2573}
2574
2575impl SqliteBackend {
2576 async fn process_bundle_entry_tx(
2578 &self,
2579 tx: &mut crate::backends::sqlite::transaction::SqliteTransaction,
2580 entry: &BundleEntry,
2581 ) -> StorageResult<BundleEntryResult> {
2582 use crate::core::transaction::Transaction;
2583
2584 match entry.method {
2585 BundleMethod::Get => {
2586 let (resource_type, id) = self.parse_url(&entry.url)?;
2588 match tx.read(&resource_type, &id).await? {
2589 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2590 None => Ok(BundleEntryResult::error(
2591 404,
2592 serde_json::json!({
2593 "resourceType": "OperationOutcome",
2594 "issue": [{"severity": "error", "code": "not-found"}]
2595 }),
2596 )),
2597 }
2598 }
2599 BundleMethod::Post => {
2600 let resource = entry.resource.clone().ok_or_else(|| {
2602 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2603 field: "resource".to_string(),
2604 })
2605 })?;
2606
2607 let resource_type = resource
2608 .get("resourceType")
2609 .and_then(|v| v.as_str())
2610 .map(|s| s.to_string())
2611 .ok_or_else(|| {
2612 StorageError::Validation(
2613 crate::error::ValidationError::MissingRequiredField {
2614 field: "resourceType".to_string(),
2615 },
2616 )
2617 })?;
2618
2619 let created = tx.create(&resource_type, resource).await?;
2620 Ok(BundleEntryResult::created(created))
2621 }
2622 BundleMethod::Put => {
2623 let resource = entry.resource.clone().ok_or_else(|| {
2625 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2626 field: "resource".to_string(),
2627 })
2628 })?;
2629
2630 let (resource_type, id) = self.parse_url(&entry.url)?;
2631
2632 match tx.read(&resource_type, &id).await? {
2634 Some(existing) => {
2635 if let Some(ref if_match) = entry.if_match {
2637 let current_etag = existing.etag();
2638 if current_etag != if_match.as_str() {
2639 return Ok(BundleEntryResult::error(
2640 412,
2641 serde_json::json!({
2642 "resourceType": "OperationOutcome",
2643 "issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
2644 }),
2645 ));
2646 }
2647 }
2648 let updated = tx.update(&existing, resource).await?;
2649 Ok(BundleEntryResult::ok(updated))
2650 }
2651 None => {
2652 let mut resource_with_id = resource;
2654 resource_with_id["id"] = serde_json::json!(id);
2655 let created = tx.create(&resource_type, resource_with_id).await?;
2656 Ok(BundleEntryResult::created(created))
2657 }
2658 }
2659 }
2660 BundleMethod::Delete => {
2661 let (resource_type, id) = self.parse_url(&entry.url)?;
2662 tx.delete(&resource_type, &id).await?;
2663 Ok(BundleEntryResult::deleted())
2664 }
2665 BundleMethod::Patch => {
2666 Ok(BundleEntryResult::error(
2668 501,
2669 serde_json::json!({
2670 "resourceType": "OperationOutcome",
2671 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2672 }),
2673 ))
2674 }
2675 }
2676 }
2677
2678 async fn process_batch_entry(
2680 &self,
2681 tenant: &TenantContext,
2682 entry: &BundleEntry,
2683 ) -> BundleEntryResult {
2684 match self.process_batch_entry_inner(tenant, entry).await {
2685 Ok(result) => result,
2686 Err(e) => BundleEntryResult::error(
2687 500,
2688 serde_json::json!({
2689 "resourceType": "OperationOutcome",
2690 "issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
2691 }),
2692 ),
2693 }
2694 }
2695
2696 async fn process_batch_entry_inner(
2697 &self,
2698 tenant: &TenantContext,
2699 entry: &BundleEntry,
2700 ) -> StorageResult<BundleEntryResult> {
2701 match entry.method {
2702 BundleMethod::Get => {
2703 let (resource_type, id) = self.parse_url(&entry.url)?;
2704 match self.read(tenant, &resource_type, &id).await? {
2705 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2706 None => Ok(BundleEntryResult::error(
2707 404,
2708 serde_json::json!({
2709 "resourceType": "OperationOutcome",
2710 "issue": [{"severity": "error", "code": "not-found"}]
2711 }),
2712 )),
2713 }
2714 }
2715 BundleMethod::Post => {
2716 let resource = entry.resource.clone().ok_or_else(|| {
2717 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2718 field: "resource".to_string(),
2719 })
2720 })?;
2721
2722 let resource_type = resource
2723 .get("resourceType")
2724 .and_then(|v| v.as_str())
2725 .map(|s| s.to_string())
2726 .ok_or_else(|| {
2727 StorageError::Validation(
2728 crate::error::ValidationError::MissingRequiredField {
2729 field: "resourceType".to_string(),
2730 },
2731 )
2732 })?;
2733
2734 let created = self
2736 .create(tenant, &resource_type, resource, FhirVersion::default())
2737 .await?;
2738 Ok(BundleEntryResult::created(created))
2739 }
2740 BundleMethod::Put => {
2741 let resource = entry.resource.clone().ok_or_else(|| {
2742 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2743 field: "resource".to_string(),
2744 })
2745 })?;
2746
2747 let (resource_type, id) = self.parse_url(&entry.url)?;
2748 let (stored, _created) = self
2750 .create_or_update(
2751 tenant,
2752 &resource_type,
2753 &id,
2754 resource,
2755 FhirVersion::default(),
2756 )
2757 .await?;
2758 Ok(BundleEntryResult::ok(stored))
2759 }
2760 BundleMethod::Delete => {
2761 let (resource_type, id) = self.parse_url(&entry.url)?;
2762 match self.delete(tenant, &resource_type, &id).await {
2763 Ok(()) => Ok(BundleEntryResult::deleted()),
2764 Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
2765 Ok(BundleEntryResult::deleted()) }
2767 Err(e) => Err(e),
2768 }
2769 }
2770 BundleMethod::Patch => Ok(BundleEntryResult::error(
2771 501,
2772 serde_json::json!({
2773 "resourceType": "OperationOutcome",
2774 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2775 }),
2776 )),
2777 }
2778 }
2779
2780 fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
2782 let path = url
2787 .strip_prefix("http://")
2788 .or_else(|| url.strip_prefix("https://"))
2789 .map(|s| {
2790 s.find('/').map(|i| &s[i..]).unwrap_or(s)
2792 })
2793 .unwrap_or(url);
2794
2795 let path = path.trim_start_matches('/');
2796 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
2797
2798 if parts.len() >= 2 {
2801 let len = parts.len();
2802 Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
2803 } else {
2804 Err(StorageError::Validation(
2805 crate::error::ValidationError::InvalidReference {
2806 reference: url.to_string(),
2807 message: "URL must be in format ResourceType/id".to_string(),
2808 },
2809 ))
2810 }
2811 }
2812}
2813
2814fn resolve_bundle_references(
2819 value: &mut serde_json::Value,
2820 reference_map: &std::collections::HashMap<String, String>,
2821) {
2822 use serde_json::Value;
2823 match value {
2824 Value::Object(map) => {
2825 if let Some(Value::String(ref_str)) = map.get("reference") {
2827 if ref_str.starts_with("urn:uuid:") {
2828 if let Some(resolved) = reference_map.get(ref_str) {
2829 map.insert("reference".to_string(), Value::String(resolved.clone()));
2830 }
2831 }
2832 }
2833 for v in map.values_mut() {
2835 resolve_bundle_references(v, reference_map);
2836 }
2837 }
2838 Value::Array(arr) => {
2839 for item in arr {
2840 resolve_bundle_references(item, reference_map);
2841 }
2842 }
2843 _ => {}
2844 }
2845}
2846
2847#[async_trait]
2849impl ReindexableStorage for SqliteBackend {
2850 async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
2851 let conn = self.get_connection()?;
2852 let tenant_id = tenant.tenant_id().as_str().to_string();
2853
2854 let mut stmt = conn
2855 .prepare(
2856 "SELECT DISTINCT resource_type FROM resources WHERE tenant_id = ?1 AND is_deleted = 0",
2857 )
2858 .map_err(|e| internal_error(format!("Failed to prepare statement: {}", e)))?;
2859
2860 let types: Vec<String> = stmt
2861 .query_map([&tenant_id], |row| row.get(0))
2862 .map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?
2863 .filter_map(|r| r.ok())
2864 .collect();
2865
2866 Ok(types)
2867 }
2868
2869 async fn count_resources(
2870 &self,
2871 tenant: &TenantContext,
2872 resource_type: &str,
2873 ) -> StorageResult<u64> {
2874 self.count(tenant, Some(resource_type)).await
2875 }
2876
2877 async fn fetch_resources_page(
2878 &self,
2879 tenant: &TenantContext,
2880 resource_type: &str,
2881 cursor: Option<&str>,
2882 limit: u32,
2883 ) -> StorageResult<ResourcePage> {
2884 let conn = self.get_connection()?;
2885 let tenant_id = tenant.tenant_id().as_str().to_string();
2886
2887 let (cursor_ts, cursor_id) = if let Some(c) = cursor {
2889 let parts: Vec<&str> = c.split('|').collect();
2890 if parts.len() == 2 {
2891 (Some(parts[0].to_string()), Some(parts[1].to_string()))
2892 } else {
2893 (None, None)
2894 }
2895 } else {
2896 (None, None)
2897 };
2898
2899 let (sql, params): (String, Vec<Box<dyn ToSql>>) =
2901 if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
2902 (
2903 "SELECT id, version_id, data, last_updated, fhir_version FROM resources \
2904 WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 \
2905 AND (last_updated > ?3 OR (last_updated = ?3 AND id > ?4)) \
2906 ORDER BY last_updated ASC, id ASC LIMIT ?5"
2907 .to_string(),
2908 vec![
2909 Box::new(tenant_id.clone()) as Box<dyn ToSql>,
2910 Box::new(resource_type.to_string()),
2911 Box::new(ts.clone()),
2912 Box::new(id.clone()),
2913 Box::new(limit as i64),
2914 ],
2915 )
2916 } else {
2917 (
2918 "SELECT id, version_id, data, last_updated, fhir_version FROM resources \
2919 WHERE tenant_id = ?1 AND resource_type = ?2 AND is_deleted = 0 \
2920 ORDER BY last_updated ASC, id ASC LIMIT ?3"
2921 .to_string(),
2922 vec![
2923 Box::new(tenant_id.clone()) as Box<dyn ToSql>,
2924 Box::new(resource_type.to_string()),
2925 Box::new(limit as i64),
2926 ],
2927 )
2928 };
2929
2930 let mut stmt = conn
2931 .prepare(&sql)
2932 .map_err(|e| internal_error(format!("Failed to prepare statement: {}", e)))?;
2933
2934 let param_refs: Vec<&dyn ToSql> = params.iter().map(|p| p.as_ref()).collect();
2935
2936 let resources: Vec<StoredResource> = stmt
2937 .query_map(param_refs.as_slice(), |row| {
2938 let id: String = row.get(0)?;
2939 let version_id: String = row.get(1)?;
2940 let data: Vec<u8> = row.get(2)?;
2941 let last_updated: String = row.get(3)?;
2942 let fhir_version: String = row.get(4)?;
2943
2944 Ok((id, version_id, data, last_updated, fhir_version))
2945 })
2946 .map_err(|e| internal_error(format!("Failed to query resources: {}", e)))?
2947 .filter_map(|r| r.ok())
2948 .filter_map(|(id, version_id, data, last_updated, fhir_version_str)| {
2949 let content: Value = serde_json::from_slice(&data).ok()?;
2950 let last_modified = chrono::DateTime::parse_from_rfc3339(&last_updated)
2951 .ok()?
2952 .with_timezone(&Utc);
2953 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
2954 Some(StoredResource::from_storage(
2955 resource_type.to_string(),
2956 id,
2957 version_id,
2958 tenant.tenant_id().clone(),
2959 content,
2960 last_modified, last_modified,
2962 None, fhir_version,
2964 ))
2965 })
2966 .collect();
2967
2968 let next_cursor = if resources.len() == limit as usize {
2970 resources
2971 .last()
2972 .map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
2973 } else {
2974 None
2975 };
2976
2977 Ok(ResourcePage {
2978 resources,
2979 next_cursor,
2980 })
2981 }
2982
2983 async fn delete_search_entries(
2984 &self,
2985 tenant: &TenantContext,
2986 resource_type: &str,
2987 resource_id: &str,
2988 ) -> StorageResult<()> {
2989 let conn = self.get_connection()?;
2990 self.delete_search_index(
2991 &conn,
2992 tenant.tenant_id().as_str(),
2993 resource_type,
2994 resource_id,
2995 )
2996 }
2997
2998 async fn write_search_entries(
2999 &self,
3000 tenant: &TenantContext,
3001 resource_type: &str,
3002 resource_id: &str,
3003 resource: &Value,
3004 ) -> StorageResult<usize> {
3005 let conn = self.get_connection()?;
3006
3007 let values = self
3009 .search_extractor()
3010 .extract(resource, resource_type)
3011 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
3012
3013 let mut count = 0;
3014 for value in values {
3015 self.write_index_entry(
3016 &conn,
3017 tenant.tenant_id().as_str(),
3018 resource_type,
3019 resource_id,
3020 &value,
3021 )?;
3022 count += 1;
3023 }
3024
3025 Ok(count)
3026 }
3027
3028 async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
3029 let conn = self.get_connection()?;
3030 let tenant_id = tenant.tenant_id().as_str();
3031
3032 let deleted = conn
3033 .execute(
3034 "DELETE FROM search_index WHERE tenant_id = ?1",
3035 params![tenant_id],
3036 )
3037 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
3038
3039 Ok(deleted as u64)
3040 }
3041}
3042
3043#[cfg(test)]
3044mod tests {
3045 use super::*;
3046 use crate::core::history::HistoryParams;
3047 use crate::tenant::{TenantId, TenantPermissions};
3048 use serde_json::json;
3049 use std::path::PathBuf;
3050
3051 use crate::backends::sqlite::SqliteBackendConfig;
3052
3053 fn create_test_backend() -> SqliteBackend {
3054 let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3057 .parent()
3058 .and_then(|p| p.parent())
3059 .map(|p| p.join("data"))
3060 .unwrap_or_else(|| PathBuf::from("data"));
3061
3062 let config = SqliteBackendConfig {
3063 data_dir: Some(data_dir),
3064 ..Default::default()
3065 };
3066 let backend = SqliteBackend::with_config(":memory:", config).unwrap();
3067 backend.init_schema().unwrap();
3068 backend
3069 }
3070
3071 fn create_test_tenant() -> TenantContext {
3072 TenantContext::new(
3073 TenantId::new("test-tenant"),
3074 TenantPermissions::full_access(),
3075 )
3076 }
3077
3078 #[tokio::test]
3079 async fn test_create_and_read() {
3080 let backend = create_test_backend();
3081 let tenant = create_test_tenant();
3082
3083 let resource = json!({
3084 "resourceType": "Patient",
3085 "name": [{"family": "Test", "given": ["User"]}]
3086 });
3087
3088 let created = backend
3090 .create(&tenant, "Patient", resource, FhirVersion::default())
3091 .await
3092 .unwrap();
3093 assert_eq!(created.resource_type(), "Patient");
3094 assert_eq!(created.version_id(), "1");
3095
3096 let read = backend
3098 .read(&tenant, "Patient", created.id())
3099 .await
3100 .unwrap();
3101 assert!(read.is_some());
3102 let read = read.unwrap();
3103 assert_eq!(read.version_id(), "1");
3104 }
3105
3106 #[tokio::test]
3107 async fn test_create_with_id() {
3108 let backend = create_test_backend();
3109 let tenant = create_test_tenant();
3110
3111 let resource = json!({
3112 "resourceType": "Patient",
3113 "id": "patient-123",
3114 "name": [{"family": "Test"}]
3115 });
3116
3117 let created = backend
3118 .create(&tenant, "Patient", resource, FhirVersion::default())
3119 .await
3120 .unwrap();
3121 assert_eq!(created.id(), "patient-123");
3122 }
3123
3124 #[tokio::test]
3125 async fn test_create_duplicate_fails() {
3126 let backend = create_test_backend();
3127 let tenant = create_test_tenant();
3128
3129 let resource = json!({"id": "patient-1"});
3130 backend
3131 .create(&tenant, "Patient", resource.clone(), FhirVersion::default())
3132 .await
3133 .unwrap();
3134
3135 let result = backend
3136 .create(&tenant, "Patient", resource, FhirVersion::default())
3137 .await;
3138 assert!(matches!(
3139 result,
3140 Err(StorageError::Resource(ResourceError::AlreadyExists { .. }))
3141 ));
3142 }
3143
3144 #[tokio::test]
3145 async fn test_read_nonexistent() {
3146 let backend = create_test_backend();
3147 let tenant = create_test_tenant();
3148
3149 let result = backend
3150 .read(&tenant, "Patient", "nonexistent")
3151 .await
3152 .unwrap();
3153 assert!(result.is_none());
3154 }
3155
3156 #[tokio::test]
3157 async fn test_update() {
3158 let backend = create_test_backend();
3159 let tenant = create_test_tenant();
3160
3161 let resource = json!({"name": [{"family": "Original"}]});
3163 let created = backend
3164 .create(&tenant, "Patient", resource, FhirVersion::default())
3165 .await
3166 .unwrap();
3167
3168 let updated_content = json!({"name": [{"family": "Updated"}]});
3170 let updated = backend
3171 .update(&tenant, &created, updated_content)
3172 .await
3173 .unwrap();
3174 assert_eq!(updated.version_id(), "2");
3175
3176 let read = backend
3178 .read(&tenant, "Patient", created.id())
3179 .await
3180 .unwrap()
3181 .unwrap();
3182 assert_eq!(read.content()["name"][0]["family"], "Updated");
3183 }
3184
3185 #[tokio::test]
3186 async fn test_update_version_conflict() {
3187 let backend = create_test_backend();
3188 let tenant = create_test_tenant();
3189
3190 let resource = json!({});
3192 let created = backend
3193 .create(&tenant, "Patient", resource, FhirVersion::default())
3194 .await
3195 .unwrap();
3196
3197 let _ = backend.update(&tenant, &created, json!({})).await.unwrap();
3199
3200 let result = backend.update(&tenant, &created, json!({})).await;
3202 assert!(matches!(
3203 result,
3204 Err(StorageError::Concurrency(
3205 ConcurrencyError::VersionConflict { .. }
3206 ))
3207 ));
3208 }
3209
3210 #[tokio::test]
3211 async fn test_delete() {
3212 let backend = create_test_backend();
3213 let tenant = create_test_tenant();
3214
3215 let resource = json!({});
3217 let created = backend
3218 .create(&tenant, "Patient", resource, FhirVersion::default())
3219 .await
3220 .unwrap();
3221
3222 backend
3224 .delete(&tenant, "Patient", created.id())
3225 .await
3226 .unwrap();
3227
3228 let result = backend.read(&tenant, "Patient", created.id()).await;
3230 assert!(matches!(
3231 result,
3232 Err(StorageError::Resource(ResourceError::Gone { .. }))
3233 ));
3234 }
3235
3236 #[tokio::test]
3237 async fn test_create_or_update_new() {
3238 let backend = create_test_backend();
3239 let tenant = create_test_tenant();
3240
3241 let (resource, created) = backend
3242 .create_or_update(
3243 &tenant,
3244 "Patient",
3245 "new-id",
3246 json!({}),
3247 FhirVersion::default(),
3248 )
3249 .await
3250 .unwrap();
3251
3252 assert!(created);
3253 assert_eq!(resource.id(), "new-id");
3254 assert_eq!(resource.version_id(), "1");
3255 }
3256
3257 #[tokio::test]
3258 async fn test_create_or_update_existing() {
3259 let backend = create_test_backend();
3260 let tenant = create_test_tenant();
3261
3262 backend
3264 .create(
3265 &tenant,
3266 "Patient",
3267 json!({"id": "existing-id"}),
3268 FhirVersion::default(),
3269 )
3270 .await
3271 .unwrap();
3272
3273 let (resource, created) = backend
3275 .create_or_update(
3276 &tenant,
3277 "Patient",
3278 "existing-id",
3279 json!({}),
3280 FhirVersion::default(),
3281 )
3282 .await
3283 .unwrap();
3284
3285 assert!(!created);
3286 assert_eq!(resource.version_id(), "2");
3287 }
3288
3289 #[tokio::test]
3290 async fn test_count() {
3291 let backend = create_test_backend();
3292 let tenant = create_test_tenant();
3293
3294 assert_eq!(backend.count(&tenant, Some("Patient")).await.unwrap(), 0);
3296
3297 backend
3299 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3300 .await
3301 .unwrap();
3302 backend
3303 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3304 .await
3305 .unwrap();
3306 backend
3307 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3308 .await
3309 .unwrap();
3310
3311 assert_eq!(backend.count(&tenant, Some("Patient")).await.unwrap(), 2);
3312 assert_eq!(
3313 backend.count(&tenant, Some("Observation")).await.unwrap(),
3314 1
3315 );
3316 assert_eq!(backend.count(&tenant, None).await.unwrap(), 3);
3317 }
3318
3319 #[tokio::test]
3320 async fn test_tenant_isolation() {
3321 let backend = create_test_backend();
3322
3323 let tenant1 =
3324 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3325 let tenant2 =
3326 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3327
3328 let resource = json!({"id": "patient-1"});
3330 backend
3331 .create(&tenant1, "Patient", resource, FhirVersion::default())
3332 .await
3333 .unwrap();
3334
3335 assert!(
3337 backend
3338 .read(&tenant1, "Patient", "patient-1")
3339 .await
3340 .unwrap()
3341 .is_some()
3342 );
3343
3344 assert!(
3346 backend
3347 .read(&tenant2, "Patient", "patient-1")
3348 .await
3349 .unwrap()
3350 .is_none()
3351 );
3352 }
3353
3354 #[tokio::test]
3359 async fn test_history_instance_basic() {
3360 let backend = create_test_backend();
3361 let tenant = create_test_tenant();
3362
3363 let resource = json!({"name": [{"family": "Smith"}]});
3365 let created = backend
3366 .create(&tenant, "Patient", resource, FhirVersion::default())
3367 .await
3368 .unwrap();
3369
3370 let v2 = backend
3372 .update(&tenant, &created, json!({"name": [{"family": "Jones"}]}))
3373 .await
3374 .unwrap();
3375 let _v3 = backend
3376 .update(&tenant, &v2, json!({"name": [{"family": "Brown"}]}))
3377 .await
3378 .unwrap();
3379
3380 let params = HistoryParams::new();
3382 let history = backend
3383 .history_instance(&tenant, "Patient", created.id(), ¶ms)
3384 .await
3385 .unwrap();
3386
3387 assert_eq!(history.items.len(), 3);
3389 assert_eq!(history.items[0].resource.version_id(), "3");
3390 assert_eq!(history.items[1].resource.version_id(), "2");
3391 assert_eq!(history.items[2].resource.version_id(), "1");
3392
3393 assert_eq!(history.items[0].method, HistoryMethod::Put);
3395 assert_eq!(history.items[1].method, HistoryMethod::Put);
3396 assert_eq!(history.items[2].method, HistoryMethod::Post);
3397 }
3398
3399 #[tokio::test]
3400 async fn test_history_instance_count() {
3401 let backend = create_test_backend();
3402 let tenant = create_test_tenant();
3403
3404 let resource = json!({});
3406 let created = backend
3407 .create(&tenant, "Patient", resource, FhirVersion::default())
3408 .await
3409 .unwrap();
3410 let v2 = backend.update(&tenant, &created, json!({})).await.unwrap();
3411 let _v3 = backend.update(&tenant, &v2, json!({})).await.unwrap();
3412
3413 let count = backend
3414 .history_instance_count(&tenant, "Patient", created.id())
3415 .await
3416 .unwrap();
3417 assert_eq!(count, 3);
3418 }
3419
3420 #[tokio::test]
3421 async fn test_history_instance_with_delete() {
3422 let backend = create_test_backend();
3423 let tenant = create_test_tenant();
3424
3425 let resource = json!({"id": "p1"});
3427 let created = backend
3428 .create(&tenant, "Patient", resource, FhirVersion::default())
3429 .await
3430 .unwrap();
3431 let _v2 = backend
3432 .update(&tenant, &created, json!({"id": "p1"}))
3433 .await
3434 .unwrap();
3435 backend.delete(&tenant, "Patient", "p1").await.unwrap();
3436
3437 let params = HistoryParams::new().include_deleted(true);
3439 let history = backend
3440 .history_instance(&tenant, "Patient", "p1", ¶ms)
3441 .await
3442 .unwrap();
3443
3444 assert_eq!(history.items.len(), 3);
3445 assert_eq!(history.items[0].method, HistoryMethod::Delete);
3446 assert_eq!(history.items[0].resource.version_id(), "3");
3447 }
3448
3449 #[tokio::test]
3450 async fn test_history_instance_exclude_deleted() {
3451 let backend = create_test_backend();
3452 let tenant = create_test_tenant();
3453
3454 let resource = json!({"id": "p2"});
3456 let created = backend
3457 .create(&tenant, "Patient", resource, FhirVersion::default())
3458 .await
3459 .unwrap();
3460 let _v2 = backend
3461 .update(&tenant, &created, json!({"id": "p2"}))
3462 .await
3463 .unwrap();
3464 backend.delete(&tenant, "Patient", "p2").await.unwrap();
3465
3466 let params = HistoryParams::new().include_deleted(false);
3468 let history = backend
3469 .history_instance(&tenant, "Patient", "p2", ¶ms)
3470 .await
3471 .unwrap();
3472
3473 assert_eq!(history.items.len(), 2);
3475 assert_eq!(history.items[0].resource.version_id(), "2");
3476 assert_eq!(history.items[1].resource.version_id(), "1");
3477 }
3478
3479 #[tokio::test]
3480 async fn test_history_instance_pagination() {
3481 let backend = create_test_backend();
3482 let tenant = create_test_tenant();
3483
3484 let resource = json!({});
3486 let mut current = backend
3487 .create(&tenant, "Patient", resource, FhirVersion::default())
3488 .await
3489 .unwrap();
3490 for _ in 0..4 {
3491 current = backend.update(&tenant, ¤t, json!({})).await.unwrap();
3492 }
3493 let params = HistoryParams::new().count(2);
3497 let page1 = backend
3498 .history_instance(&tenant, "Patient", current.id(), ¶ms)
3499 .await
3500 .unwrap();
3501
3502 assert_eq!(page1.items.len(), 2);
3503 assert_eq!(page1.items[0].resource.version_id(), "5");
3504 assert_eq!(page1.items[1].resource.version_id(), "4");
3505 assert!(page1.page_info.has_next);
3506 }
3507
3508 #[tokio::test]
3509 async fn test_history_instance_nonexistent() {
3510 let backend = create_test_backend();
3511 let tenant = create_test_tenant();
3512
3513 let params = HistoryParams::new();
3514 let history = backend
3515 .history_instance(&tenant, "Patient", "nonexistent", ¶ms)
3516 .await
3517 .unwrap();
3518
3519 assert!(history.items.is_empty());
3520 }
3521
3522 #[tokio::test]
3523 async fn test_history_instance_tenant_isolation() {
3524 let backend = create_test_backend();
3525 let tenant1 =
3526 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3527 let tenant2 =
3528 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3529
3530 let resource = json!({"id": "shared-id"});
3532 let created = backend
3533 .create(&tenant1, "Patient", resource, FhirVersion::default())
3534 .await
3535 .unwrap();
3536 let _v2 = backend
3537 .update(&tenant1, &created, json!({"id": "shared-id"}))
3538 .await
3539 .unwrap();
3540
3541 let history1 = backend
3543 .history_instance(&tenant1, "Patient", "shared-id", &HistoryParams::new())
3544 .await
3545 .unwrap();
3546 assert_eq!(history1.items.len(), 2);
3547
3548 let history2 = backend
3550 .history_instance(&tenant2, "Patient", "shared-id", &HistoryParams::new())
3551 .await
3552 .unwrap();
3553 assert!(history2.items.is_empty());
3554 }
3555
3556 #[tokio::test]
3561 async fn test_history_type_basic() {
3562 let backend = create_test_backend();
3563 let tenant = create_test_tenant();
3564
3565 let p1 = backend
3567 .create(
3568 &tenant,
3569 "Patient",
3570 json!({"id": "p1"}),
3571 FhirVersion::default(),
3572 )
3573 .await
3574 .unwrap();
3575 let _p2 = backend
3576 .create(
3577 &tenant,
3578 "Patient",
3579 json!({"id": "p2"}),
3580 FhirVersion::default(),
3581 )
3582 .await
3583 .unwrap();
3584
3585 let _p1_v2 = backend
3587 .update(&tenant, &p1, json!({"id": "p1"}))
3588 .await
3589 .unwrap();
3590
3591 let params = HistoryParams::new();
3593 let history = backend
3594 .history_type(&tenant, "Patient", ¶ms)
3595 .await
3596 .unwrap();
3597
3598 assert_eq!(history.items.len(), 3);
3600
3601 for entry in &history.items {
3603 assert_eq!(entry.resource.resource_type(), "Patient");
3604 }
3605 }
3606
3607 #[tokio::test]
3608 async fn test_history_type_count() {
3609 let backend = create_test_backend();
3610 let tenant = create_test_tenant();
3611
3612 let p1 = backend
3614 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3615 .await
3616 .unwrap();
3617 let _p1_v2 = backend.update(&tenant, &p1, json!({})).await.unwrap();
3618 let _p2 = backend
3619 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3620 .await
3621 .unwrap();
3622
3623 backend
3625 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3626 .await
3627 .unwrap();
3628
3629 let count = backend
3631 .history_type_count(&tenant, "Patient")
3632 .await
3633 .unwrap();
3634 assert_eq!(count, 3); let obs_count = backend
3638 .history_type_count(&tenant, "Observation")
3639 .await
3640 .unwrap();
3641 assert_eq!(obs_count, 1);
3642 }
3643
3644 #[tokio::test]
3645 async fn test_history_type_filters_by_type() {
3646 let backend = create_test_backend();
3647 let tenant = create_test_tenant();
3648
3649 backend
3651 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3652 .await
3653 .unwrap();
3654 backend
3655 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3656 .await
3657 .unwrap();
3658 backend
3659 .create(&tenant, "Encounter", json!({}), FhirVersion::default())
3660 .await
3661 .unwrap();
3662
3663 let history = backend
3665 .history_type(&tenant, "Patient", &HistoryParams::new())
3666 .await
3667 .unwrap();
3668 assert_eq!(history.items.len(), 1);
3669 assert_eq!(history.items[0].resource.resource_type(), "Patient");
3670
3671 let obs_history = backend
3673 .history_type(&tenant, "Observation", &HistoryParams::new())
3674 .await
3675 .unwrap();
3676 assert_eq!(obs_history.items.len(), 1);
3677 assert_eq!(obs_history.items[0].resource.resource_type(), "Observation");
3678 }
3679
3680 #[tokio::test]
3681 async fn test_history_type_includes_deleted() {
3682 let backend = create_test_backend();
3683 let tenant = create_test_tenant();
3684
3685 let _p1 = backend
3687 .create(
3688 &tenant,
3689 "Patient",
3690 json!({"id": "del-p1"}),
3691 FhirVersion::default(),
3692 )
3693 .await
3694 .unwrap();
3695 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
3696
3697 backend
3699 .create(
3700 &tenant,
3701 "Patient",
3702 json!({"id": "p2"}),
3703 FhirVersion::default(),
3704 )
3705 .await
3706 .unwrap();
3707
3708 let history = backend
3710 .history_type(&tenant, "Patient", &HistoryParams::new())
3711 .await
3712 .unwrap();
3713 assert_eq!(history.items.len(), 2); let history_with_deleted = backend
3717 .history_type(
3718 &tenant,
3719 "Patient",
3720 &HistoryParams::new().include_deleted(true),
3721 )
3722 .await
3723 .unwrap();
3724 assert_eq!(history_with_deleted.items.len(), 3); }
3726
3727 #[tokio::test]
3728 async fn test_history_type_tenant_isolation() {
3729 let backend = create_test_backend();
3730 let tenant1 =
3731 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3732 let tenant2 =
3733 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3734
3735 backend
3737 .create(&tenant1, "Patient", json!({}), FhirVersion::default())
3738 .await
3739 .unwrap();
3740 backend
3741 .create(&tenant1, "Patient", json!({}), FhirVersion::default())
3742 .await
3743 .unwrap();
3744
3745 backend
3747 .create(&tenant2, "Patient", json!({}), FhirVersion::default())
3748 .await
3749 .unwrap();
3750
3751 let history1 = backend
3753 .history_type(&tenant1, "Patient", &HistoryParams::new())
3754 .await
3755 .unwrap();
3756 assert_eq!(history1.items.len(), 2);
3757
3758 let history2 = backend
3760 .history_type(&tenant2, "Patient", &HistoryParams::new())
3761 .await
3762 .unwrap();
3763 assert_eq!(history2.items.len(), 1);
3764 }
3765
3766 #[tokio::test]
3767 async fn test_history_type_pagination() {
3768 let backend = create_test_backend();
3769 let tenant = create_test_tenant();
3770
3771 for i in 0..5 {
3773 backend
3774 .create(
3775 &tenant,
3776 "Patient",
3777 json!({"id": format!("p{}", i)}),
3778 FhirVersion::default(),
3779 )
3780 .await
3781 .unwrap();
3782 }
3783
3784 let params = HistoryParams::new().count(2);
3786 let page1 = backend
3787 .history_type(&tenant, "Patient", ¶ms)
3788 .await
3789 .unwrap();
3790
3791 assert_eq!(page1.items.len(), 2);
3792 assert!(page1.page_info.has_next);
3793 }
3794
3795 #[tokio::test]
3796 async fn test_history_type_empty() {
3797 let backend = create_test_backend();
3798 let tenant = create_test_tenant();
3799
3800 let history = backend
3802 .history_type(&tenant, "Patient", &HistoryParams::new())
3803 .await
3804 .unwrap();
3805 assert!(history.items.is_empty());
3806 assert!(!history.page_info.has_next);
3807 }
3808
3809 #[tokio::test]
3814 async fn test_history_system_basic() {
3815 let backend = create_test_backend();
3816 let tenant = create_test_tenant();
3817
3818 let p1 = backend
3820 .create(
3821 &tenant,
3822 "Patient",
3823 json!({"id": "p1"}),
3824 FhirVersion::default(),
3825 )
3826 .await
3827 .unwrap();
3828 backend
3829 .create(
3830 &tenant,
3831 "Observation",
3832 json!({"id": "o1"}),
3833 FhirVersion::default(),
3834 )
3835 .await
3836 .unwrap();
3837 backend
3838 .create(
3839 &tenant,
3840 "Encounter",
3841 json!({"id": "e1"}),
3842 FhirVersion::default(),
3843 )
3844 .await
3845 .unwrap();
3846
3847 let _p1_v2 = backend
3849 .update(&tenant, &p1, json!({"id": "p1"}))
3850 .await
3851 .unwrap();
3852
3853 let history = backend
3855 .history_system(&tenant, &HistoryParams::new())
3856 .await
3857 .unwrap();
3858
3859 assert_eq!(history.items.len(), 4);
3861
3862 let types: std::collections::HashSet<_> = history
3864 .items
3865 .iter()
3866 .map(|e| e.resource.resource_type())
3867 .collect();
3868 assert!(types.contains("Patient"));
3869 assert!(types.contains("Observation"));
3870 assert!(types.contains("Encounter"));
3871 }
3872
3873 #[tokio::test]
3874 async fn test_history_system_count() {
3875 let backend = create_test_backend();
3876 let tenant = create_test_tenant();
3877
3878 let p1 = backend
3880 .create(&tenant, "Patient", json!({}), FhirVersion::default())
3881 .await
3882 .unwrap();
3883 let _p1_v2 = backend.update(&tenant, &p1, json!({})).await.unwrap();
3884 backend
3885 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3886 .await
3887 .unwrap();
3888 backend
3889 .create(&tenant, "Encounter", json!({}), FhirVersion::default())
3890 .await
3891 .unwrap();
3892
3893 let count = backend.history_system_count(&tenant).await.unwrap();
3895 assert_eq!(count, 4); }
3897
3898 #[tokio::test]
3899 async fn test_history_system_includes_deleted() {
3900 let backend = create_test_backend();
3901 let tenant = create_test_tenant();
3902
3903 backend
3905 .create(
3906 &tenant,
3907 "Patient",
3908 json!({"id": "del-p1"}),
3909 FhirVersion::default(),
3910 )
3911 .await
3912 .unwrap();
3913 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
3914
3915 backend
3917 .create(&tenant, "Observation", json!({}), FhirVersion::default())
3918 .await
3919 .unwrap();
3920
3921 let history = backend
3923 .history_system(&tenant, &HistoryParams::new())
3924 .await
3925 .unwrap();
3926 assert_eq!(history.items.len(), 2); let history_with_deleted = backend
3930 .history_system(&tenant, &HistoryParams::new().include_deleted(true))
3931 .await
3932 .unwrap();
3933 assert_eq!(history_with_deleted.items.len(), 3); }
3935
3936 #[tokio::test]
3937 async fn test_history_system_tenant_isolation() {
3938 let backend = create_test_backend();
3939 let tenant1 =
3940 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
3941 let tenant2 =
3942 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
3943
3944 backend
3946 .create(&tenant1, "Patient", json!({}), FhirVersion::default())
3947 .await
3948 .unwrap();
3949 backend
3950 .create(&tenant1, "Observation", json!({}), FhirVersion::default())
3951 .await
3952 .unwrap();
3953
3954 backend
3956 .create(&tenant2, "Encounter", json!({}), FhirVersion::default())
3957 .await
3958 .unwrap();
3959
3960 let history1 = backend
3962 .history_system(&tenant1, &HistoryParams::new())
3963 .await
3964 .unwrap();
3965 assert_eq!(history1.items.len(), 2);
3966
3967 let history2 = backend
3969 .history_system(&tenant2, &HistoryParams::new())
3970 .await
3971 .unwrap();
3972 assert_eq!(history2.items.len(), 1);
3973
3974 assert_eq!(backend.history_system_count(&tenant1).await.unwrap(), 2);
3976 assert_eq!(backend.history_system_count(&tenant2).await.unwrap(), 1);
3977 }
3978
3979 #[tokio::test]
3980 async fn test_history_system_pagination() {
3981 let backend = create_test_backend();
3982 let tenant = create_test_tenant();
3983
3984 for i in 0..3 {
3986 backend
3987 .create(
3988 &tenant,
3989 "Patient",
3990 json!({"id": format!("p{}", i)}),
3991 FhirVersion::default(),
3992 )
3993 .await
3994 .unwrap();
3995 }
3996 for i in 0..2 {
3997 backend
3998 .create(
3999 &tenant,
4000 "Observation",
4001 json!({"id": format!("o{}", i)}),
4002 FhirVersion::default(),
4003 )
4004 .await
4005 .unwrap();
4006 }
4007 let params = HistoryParams::new().count(2);
4011 let page1 = backend.history_system(&tenant, ¶ms).await.unwrap();
4012
4013 assert_eq!(page1.items.len(), 2);
4014 assert!(page1.page_info.has_next);
4015 }
4016
4017 #[tokio::test]
4018 async fn test_history_system_empty() {
4019 let backend = create_test_backend();
4020 let tenant = create_test_tenant();
4021
4022 let history = backend
4024 .history_system(&tenant, &HistoryParams::new())
4025 .await
4026 .unwrap();
4027 assert!(history.items.is_empty());
4028 assert!(!history.page_info.has_next);
4029
4030 assert_eq!(backend.history_system_count(&tenant).await.unwrap(), 0);
4031 }
4032
4033 #[tokio::test]
4034 async fn test_history_system_ordered_by_time() {
4035 let backend = create_test_backend();
4036 let tenant = create_test_tenant();
4037
4038 backend
4040 .create(
4041 &tenant,
4042 "Patient",
4043 json!({"id": "first"}),
4044 FhirVersion::default(),
4045 )
4046 .await
4047 .unwrap();
4048 backend
4049 .create(
4050 &tenant,
4051 "Observation",
4052 json!({"id": "second"}),
4053 FhirVersion::default(),
4054 )
4055 .await
4056 .unwrap();
4057 backend
4058 .create(
4059 &tenant,
4060 "Encounter",
4061 json!({"id": "third"}),
4062 FhirVersion::default(),
4063 )
4064 .await
4065 .unwrap();
4066
4067 let history = backend
4068 .history_system(&tenant, &HistoryParams::new())
4069 .await
4070 .unwrap();
4071
4072 assert_eq!(history.items.len(), 3);
4074 assert_eq!(history.items[0].resource.id(), "third");
4076 assert_eq!(history.items[1].resource.id(), "second");
4077 assert_eq!(history.items[2].resource.id(), "first");
4078 }
4079
4080 #[tokio::test]
4085 async fn test_delete_instance_history() {
4086 let backend = create_test_backend();
4087 let tenant = create_test_tenant();
4088
4089 let p1 = backend
4091 .create(
4092 &tenant,
4093 "Patient",
4094 json!({"id": "p1", "name": [{"family": "Smith"}]}),
4095 FhirVersion::default(),
4096 )
4097 .await
4098 .unwrap();
4099 let p1_v2 = backend
4100 .update(
4101 &tenant,
4102 &p1,
4103 json!({"id": "p1", "name": [{"family": "Jones"}]}),
4104 )
4105 .await
4106 .unwrap();
4107 let _p1_v3 = backend
4108 .update(
4109 &tenant,
4110 &p1_v2,
4111 json!({"id": "p1", "name": [{"family": "Brown"}]}),
4112 )
4113 .await
4114 .unwrap();
4115
4116 let history = backend
4118 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4119 .await
4120 .unwrap();
4121 assert_eq!(history.items.len(), 3);
4122
4123 let deleted_count = backend
4125 .delete_instance_history(&tenant, "Patient", "p1")
4126 .await
4127 .unwrap();
4128 assert_eq!(deleted_count, 2); let history = backend
4132 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4133 .await
4134 .unwrap();
4135 assert_eq!(history.items.len(), 1);
4136 assert_eq!(history.items[0].resource.version_id(), "3");
4137
4138 let resource = backend.read(&tenant, "Patient", "p1").await.unwrap();
4140 assert!(resource.is_some());
4141 assert_eq!(resource.unwrap().version_id(), "3");
4142 }
4143
4144 #[tokio::test]
4145 async fn test_delete_instance_history_nonexistent() {
4146 let backend = create_test_backend();
4147 let tenant = create_test_tenant();
4148
4149 let result = backend
4151 .delete_instance_history(&tenant, "Patient", "nonexistent")
4152 .await;
4153
4154 assert!(matches!(
4155 result,
4156 Err(StorageError::Resource(ResourceError::NotFound { .. }))
4157 ));
4158 }
4159
4160 #[tokio::test]
4161 async fn test_delete_version() {
4162 let backend = create_test_backend();
4163 let tenant = create_test_tenant();
4164
4165 let p1 = backend
4167 .create(
4168 &tenant,
4169 "Patient",
4170 json!({"id": "p1", "name": [{"family": "Smith"}]}),
4171 FhirVersion::default(),
4172 )
4173 .await
4174 .unwrap();
4175 let p1_v2 = backend
4176 .update(
4177 &tenant,
4178 &p1,
4179 json!({"id": "p1", "name": [{"family": "Jones"}]}),
4180 )
4181 .await
4182 .unwrap();
4183 let _p1_v3 = backend
4184 .update(
4185 &tenant,
4186 &p1_v2,
4187 json!({"id": "p1", "name": [{"family": "Brown"}]}),
4188 )
4189 .await
4190 .unwrap();
4191
4192 backend
4194 .delete_version(&tenant, "Patient", "p1", "2")
4195 .await
4196 .unwrap();
4197
4198 let history = backend
4200 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4201 .await
4202 .unwrap();
4203 assert_eq!(history.items.len(), 2);
4204 let versions: Vec<&str> = history
4205 .items
4206 .iter()
4207 .map(|e| e.resource.version_id())
4208 .collect();
4209 assert!(versions.contains(&"1"));
4210 assert!(versions.contains(&"3"));
4211 assert!(!versions.contains(&"2"));
4212 }
4213
4214 #[tokio::test]
4215 async fn test_delete_version_current_fails() {
4216 let backend = create_test_backend();
4217 let tenant = create_test_tenant();
4218
4219 let p1 = backend
4221 .create(
4222 &tenant,
4223 "Patient",
4224 json!({"id": "p1"}),
4225 FhirVersion::default(),
4226 )
4227 .await
4228 .unwrap();
4229 let _p1_v2 = backend
4230 .update(&tenant, &p1, json!({"id": "p1"}))
4231 .await
4232 .unwrap();
4233
4234 let result = backend.delete_version(&tenant, "Patient", "p1", "2").await;
4236
4237 assert!(matches!(result, Err(StorageError::Validation(_))));
4239 }
4240
4241 #[tokio::test]
4242 async fn test_delete_version_nonexistent() {
4243 let backend = create_test_backend();
4244 let tenant = create_test_tenant();
4245
4246 backend
4248 .create(
4249 &tenant,
4250 "Patient",
4251 json!({"id": "p1"}),
4252 FhirVersion::default(),
4253 )
4254 .await
4255 .unwrap();
4256
4257 let result = backend
4259 .delete_version(&tenant, "Patient", "p1", "999")
4260 .await;
4261
4262 assert!(matches!(
4263 result,
4264 Err(StorageError::Resource(
4265 ResourceError::VersionNotFound { .. }
4266 ))
4267 ));
4268 }
4269
4270 #[tokio::test]
4271 async fn test_delete_version_resource_not_found() {
4272 let backend = create_test_backend();
4273 let tenant = create_test_tenant();
4274
4275 let result = backend
4277 .delete_version(&tenant, "Patient", "nonexistent", "1")
4278 .await;
4279
4280 assert!(matches!(
4281 result,
4282 Err(StorageError::Resource(ResourceError::NotFound { .. }))
4283 ));
4284 }
4285
4286 #[tokio::test]
4291 async fn test_purge_single_resource() {
4292 let backend = create_test_backend();
4293 let tenant = create_test_tenant();
4294
4295 let p1 = backend
4297 .create(
4298 &tenant,
4299 "Patient",
4300 json!({"id": "p1"}),
4301 FhirVersion::default(),
4302 )
4303 .await
4304 .unwrap();
4305 let _p1_v2 = backend
4306 .update(&tenant, &p1, json!({"id": "p1"}))
4307 .await
4308 .unwrap();
4309
4310 backend.purge(&tenant, "Patient", "p1").await.unwrap();
4312
4313 let read_result = backend.read(&tenant, "Patient", "p1").await.unwrap();
4315 assert!(read_result.is_none());
4316
4317 let history = backend
4319 .history_instance(&tenant, "Patient", "p1", &HistoryParams::new())
4320 .await
4321 .unwrap();
4322 assert!(history.items.is_empty());
4323 }
4324
4325 #[tokio::test]
4326 async fn test_purge_deleted_resource() {
4327 let backend = create_test_backend();
4328 let tenant = create_test_tenant();
4329
4330 backend
4332 .create(
4333 &tenant,
4334 "Patient",
4335 json!({"id": "del-p1"}),
4336 FhirVersion::default(),
4337 )
4338 .await
4339 .unwrap();
4340 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
4341
4342 backend.purge(&tenant, "Patient", "del-p1").await.unwrap();
4344
4345 let history = backend
4347 .history_instance(
4348 &tenant,
4349 "Patient",
4350 "del-p1",
4351 &HistoryParams::new().include_deleted(true),
4352 )
4353 .await
4354 .unwrap();
4355 assert!(history.items.is_empty());
4356 }
4357
4358 #[tokio::test]
4359 async fn test_purge_nonexistent_resource() {
4360 let backend = create_test_backend();
4361 let tenant = create_test_tenant();
4362
4363 let result = backend.purge(&tenant, "Patient", "nonexistent").await;
4365 assert!(result.is_err());
4366 }
4367
4368 #[tokio::test]
4369 async fn test_purge_tenant_isolation() {
4370 let backend = create_test_backend();
4371 let tenant1 =
4372 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4373 let tenant2 =
4374 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4375
4376 backend
4378 .create(
4379 &tenant1,
4380 "Patient",
4381 json!({"id": "shared-id"}),
4382 FhirVersion::default(),
4383 )
4384 .await
4385 .unwrap();
4386
4387 backend
4389 .create(
4390 &tenant2,
4391 "Patient",
4392 json!({"id": "shared-id"}),
4393 FhirVersion::default(),
4394 )
4395 .await
4396 .unwrap();
4397
4398 backend
4400 .purge(&tenant1, "Patient", "shared-id")
4401 .await
4402 .unwrap();
4403
4404 let t2_read = backend
4406 .read(&tenant2, "Patient", "shared-id")
4407 .await
4408 .unwrap();
4409 assert!(t2_read.is_some());
4410
4411 let t1_read = backend
4413 .read(&tenant1, "Patient", "shared-id")
4414 .await
4415 .unwrap();
4416 assert!(t1_read.is_none());
4417 }
4418
4419 #[tokio::test]
4420 async fn test_purge_all_single_type() {
4421 let backend = create_test_backend();
4422 let tenant = create_test_tenant();
4423
4424 for i in 0..5 {
4426 backend
4427 .create(
4428 &tenant,
4429 "Patient",
4430 json!({"id": format!("p{}", i)}),
4431 FhirVersion::default(),
4432 )
4433 .await
4434 .unwrap();
4435 }
4436
4437 backend
4439 .create(&tenant, "Observation", json!({}), FhirVersion::default())
4440 .await
4441 .unwrap();
4442
4443 let count = backend.purge_all(&tenant, "Patient").await.unwrap();
4445 assert_eq!(count, 5);
4446
4447 let patient_history = backend
4449 .history_type(&tenant, "Patient", &HistoryParams::new())
4450 .await
4451 .unwrap();
4452 assert!(patient_history.items.is_empty());
4453
4454 let obs_history = backend
4456 .history_type(&tenant, "Observation", &HistoryParams::new())
4457 .await
4458 .unwrap();
4459 assert_eq!(obs_history.items.len(), 1);
4460 }
4461
4462 #[tokio::test]
4463 async fn test_purge_all_empty_type() {
4464 let backend = create_test_backend();
4465 let tenant = create_test_tenant();
4466
4467 let count = backend.purge_all(&tenant, "Patient").await.unwrap();
4469 assert_eq!(count, 0);
4470 }
4471
4472 #[tokio::test]
4473 async fn test_purge_all_tenant_isolation() {
4474 let backend = create_test_backend();
4475 let tenant1 =
4476 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4477 let tenant2 =
4478 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4479
4480 for i in 0..3 {
4482 backend
4483 .create(
4484 &tenant1,
4485 "Patient",
4486 json!({"id": format!("t1-p{}", i)}),
4487 FhirVersion::default(),
4488 )
4489 .await
4490 .unwrap();
4491 }
4492 for i in 0..2 {
4493 backend
4494 .create(
4495 &tenant2,
4496 "Patient",
4497 json!({"id": format!("t2-p{}", i)}),
4498 FhirVersion::default(),
4499 )
4500 .await
4501 .unwrap();
4502 }
4503
4504 let count = backend.purge_all(&tenant1, "Patient").await.unwrap();
4506 assert_eq!(count, 3);
4507
4508 let t2_history = backend
4510 .history_type(&tenant2, "Patient", &HistoryParams::new())
4511 .await
4512 .unwrap();
4513 assert_eq!(t2_history.items.len(), 2);
4514 }
4515
4516 #[tokio::test]
4521 async fn test_modified_since_basic() {
4522 let backend = create_test_backend();
4523 let tenant = create_test_tenant();
4524
4525 let before_create = Utc::now();
4527
4528 backend
4530 .create(
4531 &tenant,
4532 "Patient",
4533 json!({"id": "p1"}),
4534 FhirVersion::default(),
4535 )
4536 .await
4537 .unwrap();
4538 backend
4539 .create(
4540 &tenant,
4541 "Patient",
4542 json!({"id": "p2"}),
4543 FhirVersion::default(),
4544 )
4545 .await
4546 .unwrap();
4547 backend
4548 .create(
4549 &tenant,
4550 "Observation",
4551 json!({"id": "o1"}),
4552 FhirVersion::default(),
4553 )
4554 .await
4555 .unwrap();
4556
4557 let pagination = Pagination::default();
4559 let result = backend
4560 .modified_since(&tenant, None, before_create, &pagination)
4561 .await
4562 .unwrap();
4563
4564 assert_eq!(result.items.len(), 3);
4566 }
4567
4568 #[tokio::test]
4569 async fn test_modified_since_with_type_filter() {
4570 let backend = create_test_backend();
4571 let tenant = create_test_tenant();
4572
4573 let before_create = Utc::now();
4574
4575 backend
4577 .create(
4578 &tenant,
4579 "Patient",
4580 json!({"id": "p1"}),
4581 FhirVersion::default(),
4582 )
4583 .await
4584 .unwrap();
4585 backend
4586 .create(
4587 &tenant,
4588 "Patient",
4589 json!({"id": "p2"}),
4590 FhirVersion::default(),
4591 )
4592 .await
4593 .unwrap();
4594 backend
4595 .create(
4596 &tenant,
4597 "Observation",
4598 json!({"id": "o1"}),
4599 FhirVersion::default(),
4600 )
4601 .await
4602 .unwrap();
4603
4604 let pagination = Pagination::default();
4606 let result = backend
4607 .modified_since(&tenant, Some("Patient"), before_create, &pagination)
4608 .await
4609 .unwrap();
4610
4611 assert_eq!(result.items.len(), 2);
4613 for resource in &result.items {
4614 assert_eq!(resource.resource_type(), "Patient");
4615 }
4616 }
4617
4618 #[tokio::test]
4619 async fn test_modified_since_excludes_older() {
4620 let backend = create_test_backend();
4621 let tenant = create_test_tenant();
4622
4623 backend
4625 .create(
4626 &tenant,
4627 "Patient",
4628 json!({"id": "old"}),
4629 FhirVersion::default(),
4630 )
4631 .await
4632 .unwrap();
4633
4634 let after_first = Utc::now();
4636
4637 backend
4639 .create(
4640 &tenant,
4641 "Patient",
4642 json!({"id": "new"}),
4643 FhirVersion::default(),
4644 )
4645 .await
4646 .unwrap();
4647
4648 let pagination = Pagination::default();
4650 let result = backend
4651 .modified_since(&tenant, None, after_first, &pagination)
4652 .await
4653 .unwrap();
4654
4655 assert_eq!(result.items.len(), 1);
4657 assert_eq!(result.items[0].id(), "new");
4658 }
4659
4660 #[tokio::test]
4661 async fn test_modified_since_tenant_isolation() {
4662 let backend = create_test_backend();
4663 let tenant1 =
4664 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
4665 let tenant2 =
4666 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
4667
4668 let before_create = Utc::now();
4669
4670 backend
4672 .create(
4673 &tenant1,
4674 "Patient",
4675 json!({"id": "t1-p1"}),
4676 FhirVersion::default(),
4677 )
4678 .await
4679 .unwrap();
4680 backend
4681 .create(
4682 &tenant2,
4683 "Patient",
4684 json!({"id": "t2-p1"}),
4685 FhirVersion::default(),
4686 )
4687 .await
4688 .unwrap();
4689
4690 let pagination = Pagination::default();
4692 let result1 = backend
4693 .modified_since(&tenant1, None, before_create, &pagination)
4694 .await
4695 .unwrap();
4696 assert_eq!(result1.items.len(), 1);
4697 assert_eq!(result1.items[0].id(), "t1-p1");
4698
4699 let result2 = backend
4701 .modified_since(&tenant2, None, before_create, &pagination)
4702 .await
4703 .unwrap();
4704 assert_eq!(result2.items.len(), 1);
4705 assert_eq!(result2.items[0].id(), "t2-p1");
4706 }
4707
4708 #[tokio::test]
4709 async fn test_modified_since_excludes_deleted() {
4710 let backend = create_test_backend();
4711 let tenant = create_test_tenant();
4712
4713 let before_create = Utc::now();
4714
4715 backend
4717 .create(
4718 &tenant,
4719 "Patient",
4720 json!({"id": "del-p1"}),
4721 FhirVersion::default(),
4722 )
4723 .await
4724 .unwrap();
4725 backend.delete(&tenant, "Patient", "del-p1").await.unwrap();
4726
4727 backend
4729 .create(
4730 &tenant,
4731 "Patient",
4732 json!({"id": "live-p1"}),
4733 FhirVersion::default(),
4734 )
4735 .await
4736 .unwrap();
4737
4738 let pagination = Pagination::default();
4740 let result = backend
4741 .modified_since(&tenant, None, before_create, &pagination)
4742 .await
4743 .unwrap();
4744
4745 assert_eq!(result.items.len(), 1);
4747 assert_eq!(result.items[0].id(), "live-p1");
4748 }
4749
4750 #[tokio::test]
4751 async fn test_modified_since_pagination() {
4752 let backend = create_test_backend();
4753 let tenant = create_test_tenant();
4754
4755 let before_create = Utc::now();
4756
4757 for i in 0..5 {
4759 backend
4760 .create(
4761 &tenant,
4762 "Patient",
4763 json!({"id": format!("p{}", i)}),
4764 FhirVersion::default(),
4765 )
4766 .await
4767 .unwrap();
4768 }
4769
4770 let pagination = Pagination::cursor().with_count(2);
4772 let page1 = backend
4773 .modified_since(&tenant, None, before_create, &pagination)
4774 .await
4775 .unwrap();
4776
4777 assert_eq!(page1.items.len(), 2);
4778 assert!(page1.page_info.has_next);
4779 }
4780
4781 #[tokio::test]
4782 async fn test_modified_since_empty() {
4783 let backend = create_test_backend();
4784 let tenant = create_test_tenant();
4785
4786 let pagination = Pagination::default();
4788 let result = backend
4789 .modified_since(&tenant, None, Utc::now(), &pagination)
4790 .await
4791 .unwrap();
4792
4793 assert!(result.items.is_empty());
4794 assert!(!result.page_info.has_next);
4795 }
4796
4797 #[tokio::test]
4798 async fn test_modified_since_returns_current_version() {
4799 let backend = create_test_backend();
4800 let tenant = create_test_tenant();
4801
4802 let before_create = Utc::now();
4803
4804 let p1 = backend
4806 .create(
4807 &tenant,
4808 "Patient",
4809 json!({"id": "p1", "name": "v1"}),
4810 FhirVersion::default(),
4811 )
4812 .await
4813 .unwrap();
4814 let p1_v2 = backend
4815 .update(&tenant, &p1, json!({"id": "p1", "name": "v2"}))
4816 .await
4817 .unwrap();
4818 let _p1_v3 = backend
4819 .update(&tenant, &p1_v2, json!({"id": "p1", "name": "v3"}))
4820 .await
4821 .unwrap();
4822
4823 let pagination = Pagination::default();
4825 let result = backend
4826 .modified_since(&tenant, None, before_create, &pagination)
4827 .await
4828 .unwrap();
4829
4830 assert_eq!(result.items.len(), 1);
4831 assert_eq!(result.items[0].version_id(), "3");
4832 }
4833
4834 #[tokio::test]
4839 async fn test_conditional_create_no_match() {
4840 let backend = create_test_backend();
4841 let tenant = create_test_tenant();
4842
4843 let result = backend
4845 .conditional_create(
4846 &tenant,
4847 "Patient",
4848 json!({"identifier": [{"value": "12345"}]}),
4849 "identifier=99999", FhirVersion::default(),
4851 )
4852 .await
4853 .unwrap();
4854
4855 match result {
4856 ConditionalCreateResult::Created(resource) => {
4857 assert_eq!(resource.resource_type(), "Patient");
4858 }
4859 _ => panic!("Expected Created result"),
4860 }
4861 }
4862
4863 #[tokio::test]
4864 async fn test_conditional_create_single_match() {
4865 let backend = create_test_backend();
4866 let tenant = create_test_tenant();
4867
4868 let existing = backend
4870 .create(
4871 &tenant,
4872 "Patient",
4873 json!({"id": "p1", "identifier": [{"value": "12345"}]}),
4874 FhirVersion::default(),
4875 )
4876 .await
4877 .unwrap();
4878
4879 let result = backend
4881 .conditional_create(
4882 &tenant,
4883 "Patient",
4884 json!({"identifier": [{"value": "12345"}]}),
4885 "identifier=12345",
4886 FhirVersion::default(),
4887 )
4888 .await
4889 .unwrap();
4890
4891 match result {
4892 ConditionalCreateResult::Exists(resource) => {
4893 assert_eq!(resource.id(), existing.id());
4894 }
4895 _ => panic!("Expected Exists result"),
4896 }
4897 }
4898
4899 #[tokio::test]
4900 async fn test_conditional_create_by_id() {
4901 let backend = create_test_backend();
4902 let tenant = create_test_tenant();
4903
4904 backend
4906 .create(
4907 &tenant,
4908 "Patient",
4909 json!({"id": "p1"}),
4910 FhirVersion::default(),
4911 )
4912 .await
4913 .unwrap();
4914
4915 let result = backend
4917 .conditional_create(
4918 &tenant,
4919 "Patient",
4920 json!({}),
4921 "_id=p1",
4922 FhirVersion::default(),
4923 )
4924 .await
4925 .unwrap();
4926
4927 match result {
4928 ConditionalCreateResult::Exists(resource) => {
4929 assert_eq!(resource.id(), "p1");
4930 }
4931 _ => panic!("Expected Exists result"),
4932 }
4933 }
4934
4935 #[tokio::test]
4936 async fn test_conditional_update_single_match() {
4937 let backend = create_test_backend();
4938 let tenant = create_test_tenant();
4939
4940 backend
4942 .create(
4943 &tenant,
4944 "Patient",
4945 json!({"id": "p1", "identifier": [{"value": "12345"}], "active": false}),
4946 FhirVersion::default(),
4947 )
4948 .await
4949 .unwrap();
4950
4951 let result = backend
4953 .conditional_update(
4954 &tenant,
4955 "Patient",
4956 json!({"id": "p1", "identifier": [{"value": "12345"}], "active": true}),
4957 "identifier=12345",
4958 false,
4959 FhirVersion::default(),
4960 )
4961 .await
4962 .unwrap();
4963
4964 match result {
4965 ConditionalUpdateResult::Updated(resource) => {
4966 assert_eq!(resource.version_id(), "2");
4967 }
4968 _ => panic!("Expected Updated result"),
4969 }
4970 }
4971
4972 #[tokio::test]
4973 async fn test_conditional_update_no_match_no_upsert() {
4974 let backend = create_test_backend();
4975 let tenant = create_test_tenant();
4976
4977 let result = backend
4979 .conditional_update(
4980 &tenant,
4981 "Patient",
4982 json!({"identifier": [{"value": "99999"}]}),
4983 "identifier=99999",
4984 false,
4985 FhirVersion::default(),
4986 )
4987 .await
4988 .unwrap();
4989
4990 match result {
4991 ConditionalUpdateResult::NoMatch => {}
4992 _ => panic!("Expected NoMatch result"),
4993 }
4994 }
4995
4996 #[tokio::test]
4997 async fn test_conditional_update_no_match_with_upsert() {
4998 let backend = create_test_backend();
4999 let tenant = create_test_tenant();
5000
5001 let result = backend
5003 .conditional_update(
5004 &tenant,
5005 "Patient",
5006 json!({"identifier": [{"value": "new-id"}]}),
5007 "identifier=new-id",
5008 true,
5009 FhirVersion::default(),
5010 )
5011 .await
5012 .unwrap();
5013
5014 match result {
5015 ConditionalUpdateResult::Created(resource) => {
5016 assert_eq!(resource.resource_type(), "Patient");
5017 }
5018 _ => panic!("Expected Created result"),
5019 }
5020 }
5021
5022 #[tokio::test]
5023 async fn test_conditional_delete_single_match() {
5024 let backend = create_test_backend();
5025 let tenant = create_test_tenant();
5026
5027 backend
5029 .create(
5030 &tenant,
5031 "Patient",
5032 json!({"id": "p1"}),
5033 FhirVersion::default(),
5034 )
5035 .await
5036 .unwrap();
5037
5038 let result = backend
5040 .conditional_delete(&tenant, "Patient", "_id=p1")
5041 .await
5042 .unwrap();
5043
5044 match result {
5045 ConditionalDeleteResult::Deleted => {
5046 let read_result = backend.read(&tenant, "Patient", "p1").await;
5048 match read_result {
5049 Ok(None) => {} Err(StorageError::Resource(ResourceError::Gone { .. })) => {} other => panic!("Expected None or Gone, got {:?}", other),
5052 }
5053 }
5054 _ => panic!("Expected Deleted result"),
5055 }
5056 }
5057
5058 #[tokio::test]
5059 async fn test_conditional_delete_no_match() {
5060 let backend = create_test_backend();
5061 let tenant = create_test_tenant();
5062
5063 let result = backend
5065 .conditional_delete(&tenant, "Patient", "_id=nonexistent")
5066 .await
5067 .unwrap();
5068
5069 match result {
5070 ConditionalDeleteResult::NoMatch => {}
5071 _ => panic!("Expected NoMatch result"),
5072 }
5073 }
5074
5075 #[tokio::test]
5076 async fn test_conditional_operations_tenant_isolation() {
5077 let backend = create_test_backend();
5078 let tenant1 =
5079 TenantContext::new(TenantId::new("tenant-1"), TenantPermissions::full_access());
5080 let tenant2 =
5081 TenantContext::new(TenantId::new("tenant-2"), TenantPermissions::full_access());
5082
5083 backend
5085 .create(
5086 &tenant1,
5087 "Patient",
5088 json!({"id": "shared-id"}),
5089 FhirVersion::default(),
5090 )
5091 .await
5092 .unwrap();
5093
5094 let result = backend
5096 .conditional_create(
5097 &tenant2,
5098 "Patient",
5099 json!({}),
5100 "_id=shared-id",
5101 FhirVersion::default(),
5102 )
5103 .await
5104 .unwrap();
5105
5106 match result {
5107 ConditionalCreateResult::Created(_) => {}
5108 _ => panic!("Expected Created result (tenant isolation)"),
5109 }
5110 }
5111
5112 #[tokio::test]
5117 async fn test_conditional_patch_json_patch() {
5118 use crate::core::PatchFormat;
5119
5120 let backend = create_test_backend();
5121 let tenant = create_test_tenant();
5122
5123 backend
5125 .create(
5126 &tenant,
5127 "Patient",
5128 json!({"id": "p1", "active": false, "name": [{"family": "Smith"}]}),
5129 FhirVersion::default(),
5130 )
5131 .await
5132 .unwrap();
5133
5134 let patch = PatchFormat::JsonPatch(json!([
5136 {"op": "replace", "path": "/active", "value": true}
5137 ]));
5138
5139 let result = backend
5140 .conditional_patch(&tenant, "Patient", "_id=p1", &patch)
5141 .await
5142 .unwrap();
5143
5144 match result {
5145 crate::core::ConditionalPatchResult::Patched(resource) => {
5146 assert_eq!(resource.content()["active"], json!(true));
5147 }
5148 _ => panic!("Expected Patched result"),
5149 }
5150 }
5151
5152 #[tokio::test]
5153 async fn test_conditional_patch_merge_patch() {
5154 use crate::core::PatchFormat;
5155
5156 let backend = create_test_backend();
5157 let tenant = create_test_tenant();
5158
5159 backend
5161 .create(
5162 &tenant,
5163 "Patient",
5164 json!({"id": "p1", "active": false, "gender": "unknown"}),
5165 FhirVersion::default(),
5166 )
5167 .await
5168 .unwrap();
5169
5170 let patch = PatchFormat::MergePatch(json!({
5172 "active": true,
5173 "gender": null }));
5175
5176 let result = backend
5177 .conditional_patch(&tenant, "Patient", "_id=p1", &patch)
5178 .await
5179 .unwrap();
5180
5181 match result {
5182 crate::core::ConditionalPatchResult::Patched(resource) => {
5183 assert_eq!(resource.content()["active"], json!(true));
5184 assert!(resource.content().get("gender").is_none());
5185 }
5186 _ => panic!("Expected Patched result"),
5187 }
5188 }
5189
5190 #[tokio::test]
5191 async fn test_conditional_patch_no_match() {
5192 use crate::core::PatchFormat;
5193
5194 let backend = create_test_backend();
5195 let tenant = create_test_tenant();
5196
5197 let patch = PatchFormat::JsonPatch(json!([
5198 {"op": "replace", "path": "/active", "value": true}
5199 ]));
5200
5201 let result = backend
5202 .conditional_patch(&tenant, "Patient", "_id=nonexistent", &patch)
5203 .await
5204 .unwrap();
5205
5206 match result {
5207 crate::core::ConditionalPatchResult::NoMatch => {}
5208 _ => panic!("Expected NoMatch result"),
5209 }
5210 }
5211
5212 #[tokio::test]
5217 async fn test_batch_create_multiple() {
5218 use crate::core::transaction::BundleProvider;
5219
5220 let backend = create_test_backend();
5221 let tenant = create_test_tenant();
5222
5223 let entries = vec![
5224 BundleEntry {
5225 method: BundleMethod::Post,
5226 url: "Patient".to_string(),
5227 resource: Some(json!({"resourceType": "Patient", "id": "batch-p1"})),
5228 if_match: None,
5229 if_none_match: None,
5230 if_none_exist: None,
5231 full_url: None,
5232 },
5233 BundleEntry {
5234 method: BundleMethod::Post,
5235 url: "Patient".to_string(),
5236 resource: Some(json!({"resourceType": "Patient", "id": "batch-p2"})),
5237 if_match: None,
5238 if_none_match: None,
5239 if_none_exist: None,
5240 full_url: None,
5241 },
5242 ];
5243
5244 let result = backend.process_batch(&tenant, entries).await.unwrap();
5245
5246 assert_eq!(result.entries.len(), 2);
5247 assert_eq!(result.entries[0].status, 201);
5248 assert_eq!(result.entries[1].status, 201);
5249 }
5250
5251 #[tokio::test]
5252 async fn test_batch_mixed_operations() {
5253 use crate::core::transaction::BundleProvider;
5254
5255 let backend = create_test_backend();
5256 let tenant = create_test_tenant();
5257
5258 backend
5260 .create(
5261 &tenant,
5262 "Patient",
5263 json!({"id": "existing"}),
5264 FhirVersion::default(),
5265 )
5266 .await
5267 .unwrap();
5268
5269 let entries = vec![
5270 BundleEntry {
5272 method: BundleMethod::Get,
5273 url: "Patient/existing".to_string(),
5274 resource: None,
5275 if_match: None,
5276 if_none_match: None,
5277 if_none_exist: None,
5278 full_url: None,
5279 },
5280 BundleEntry {
5282 method: BundleMethod::Post,
5283 url: "Patient".to_string(),
5284 resource: Some(json!({"resourceType": "Patient", "id": "new"})),
5285 if_match: None,
5286 if_none_match: None,
5287 if_none_exist: None,
5288 full_url: None,
5289 },
5290 BundleEntry {
5292 method: BundleMethod::Get,
5293 url: "Patient/nonexistent".to_string(),
5294 resource: None,
5295 if_match: None,
5296 if_none_match: None,
5297 if_none_exist: None,
5298 full_url: None,
5299 },
5300 ];
5301
5302 let result = backend.process_batch(&tenant, entries).await.unwrap();
5303
5304 assert_eq!(result.entries.len(), 3);
5305 assert_eq!(result.entries[0].status, 200); assert_eq!(result.entries[1].status, 201); assert_eq!(result.entries[2].status, 404); }
5309
5310 #[tokio::test]
5311 async fn test_batch_delete() {
5312 use crate::core::transaction::BundleProvider;
5313
5314 let backend = create_test_backend();
5315 let tenant = create_test_tenant();
5316
5317 backend
5319 .create(
5320 &tenant,
5321 "Patient",
5322 json!({"id": "to-delete"}),
5323 FhirVersion::default(),
5324 )
5325 .await
5326 .unwrap();
5327
5328 let entries = vec![BundleEntry {
5329 method: BundleMethod::Delete,
5330 url: "Patient/to-delete".to_string(),
5331 resource: None,
5332 if_match: None,
5333 if_none_match: None,
5334 if_none_exist: None,
5335 full_url: None,
5336 }];
5337
5338 let result = backend.process_batch(&tenant, entries).await.unwrap();
5339
5340 assert_eq!(result.entries.len(), 1);
5341 assert_eq!(result.entries[0].status, 204);
5342
5343 let read_result = backend.read(&tenant, "Patient", "to-delete").await;
5345 match read_result {
5346 Ok(None) => {} Err(StorageError::Resource(ResourceError::Gone { .. })) => {} other => panic!("Expected None or Gone, got {:?}", other),
5349 }
5350 }
5351
5352 #[tokio::test]
5353 async fn test_transaction_all_or_nothing() {
5354 use crate::core::transaction::BundleProvider;
5355
5356 let backend = create_test_backend();
5357 let tenant = create_test_tenant();
5358
5359 backend
5361 .create(
5362 &tenant,
5363 "Patient",
5364 json!({"id": "existing"}),
5365 FhirVersion::default(),
5366 )
5367 .await
5368 .unwrap();
5369
5370 let entries = vec![
5371 BundleEntry {
5373 method: BundleMethod::Post,
5374 url: "Patient".to_string(),
5375 resource: Some(json!({"resourceType": "Patient", "id": "tx-p1"})),
5376 if_match: None,
5377 if_none_match: None,
5378 if_none_exist: None,
5379 full_url: None,
5380 },
5381 BundleEntry {
5383 method: BundleMethod::Post,
5384 url: "Patient".to_string(),
5385 resource: Some(json!({"resourceType": "Patient", "id": "existing"})),
5386 if_match: None,
5387 if_none_match: None,
5388 if_none_exist: None,
5389 full_url: None,
5390 },
5391 ];
5392
5393 let result = backend.process_transaction(&tenant, entries).await;
5394
5395 assert!(result.is_err());
5397
5398 let read = backend.read(&tenant, "Patient", "tx-p1").await.unwrap();
5400 assert!(read.is_none());
5401 }
5402
5403 #[tokio::test]
5404 async fn test_transaction_success() {
5405 use crate::core::transaction::BundleProvider;
5406
5407 let backend = create_test_backend();
5408 let tenant = create_test_tenant();
5409
5410 let entries = vec![
5411 BundleEntry {
5412 method: BundleMethod::Post,
5413 url: "Patient".to_string(),
5414 resource: Some(json!({"resourceType": "Patient", "id": "tx-success-1"})),
5415 if_match: None,
5416 if_none_match: None,
5417 if_none_exist: None,
5418 full_url: None,
5419 },
5420 BundleEntry {
5421 method: BundleMethod::Post,
5422 url: "Observation".to_string(),
5423 resource: Some(json!({"resourceType": "Observation", "id": "tx-success-2"})),
5424 if_match: None,
5425 if_none_match: None,
5426 if_none_exist: None,
5427 full_url: None,
5428 },
5429 ];
5430
5431 let result = backend.process_transaction(&tenant, entries).await.unwrap();
5432
5433 assert_eq!(result.entries.len(), 2);
5434 assert_eq!(result.entries[0].status, 201);
5435 assert_eq!(result.entries[1].status, 201);
5436
5437 assert!(
5439 backend
5440 .read(&tenant, "Patient", "tx-success-1")
5441 .await
5442 .unwrap()
5443 .is_some()
5444 );
5445 assert!(
5446 backend
5447 .read(&tenant, "Observation", "tx-success-2")
5448 .await
5449 .unwrap()
5450 .is_some()
5451 );
5452 }
5453
5454 #[tokio::test]
5455 async fn test_parse_url_formats() {
5456 let backend = create_test_backend();
5457
5458 let (rt, id) = backend.parse_url("Patient/123").unwrap();
5460 assert_eq!(rt, "Patient");
5461 assert_eq!(id, "123");
5462
5463 let (rt, id) = backend.parse_url("/Patient/456").unwrap();
5465 assert_eq!(rt, "Patient");
5466 assert_eq!(id, "456");
5467
5468 let (rt, id) = backend
5470 .parse_url("http://example.com/fhir/Patient/789")
5471 .unwrap();
5472 assert_eq!(rt, "Patient");
5473 assert_eq!(id, "789");
5474
5475 let (rt, id) = backend
5477 .parse_url("https://example.com/fhir/Observation/obs-1")
5478 .unwrap();
5479 assert_eq!(rt, "Observation");
5480 assert_eq!(id, "obs-1");
5481 }
5482
5483 #[tokio::test]
5488 async fn test_search_index_display_text_populated() {
5489 let backend = create_test_backend();
5490 let tenant = create_test_tenant();
5491
5492 backend
5494 .create(
5495 &tenant,
5496 "Observation",
5497 json!({
5498 "resourceType": "Observation",
5499 "id": "obs-display-test",
5500 "code": {
5501 "coding": [
5502 {
5503 "system": "http://loinc.org",
5504 "code": "8867-4",
5505 "display": "Heart rate"
5506 }
5507 ]
5508 },
5509 "status": "final"
5510 }),
5511 FhirVersion::default(),
5512 )
5513 .await
5514 .unwrap();
5515
5516 let conn = backend.get_connection().unwrap();
5518 let mut stmt = conn
5519 .prepare(
5520 "SELECT param_name, value_token_system, value_token_code, value_token_display
5521 FROM search_index
5522 WHERE tenant_id = 'test-tenant'
5523 AND resource_id = 'obs-display-test'
5524 AND param_name = 'code'",
5525 )
5526 .unwrap();
5527
5528 #[allow(clippy::type_complexity)]
5529 let rows: Vec<(String, Option<String>, Option<String>, Option<String>)> = stmt
5530 .query_map([], |row| {
5531 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
5532 })
5533 .unwrap()
5534 .filter_map(|r| r.ok())
5535 .collect();
5536
5537 assert!(
5539 !rows.is_empty(),
5540 "Should have indexed 'code' parameter for Observation"
5541 );
5542
5543 let entry = rows
5545 .iter()
5546 .find(|(_, _, code, _)| code.as_deref() == Some("8867-4"));
5547 assert!(entry.is_some(), "Should have entry with code 8867-4");
5548
5549 let (_, _, _, display) = entry.unwrap();
5551 assert_eq!(
5552 display.as_deref(),
5553 Some("Heart rate"),
5554 "Display text should be 'Heart rate'"
5555 );
5556 }
5557
5558 #[tokio::test]
5559 async fn test_search_index_identifier_type_populated() {
5560 let backend = create_test_backend();
5561 let tenant = create_test_tenant();
5562
5563 backend
5565 .create(
5566 &tenant,
5567 "Patient",
5568 json!({
5569 "resourceType": "Patient",
5570 "id": "patient-type-test",
5571 "identifier": [
5572 {
5573 "type": {
5574 "coding": [
5575 {
5576 "system": "http://terminology.hl7.org/CodeSystem/v2-0203",
5577 "code": "MR"
5578 }
5579 ]
5580 },
5581 "system": "http://hospital.org/mrn",
5582 "value": "MRN12345"
5583 }
5584 ]
5585 }),
5586 FhirVersion::default(),
5587 )
5588 .await
5589 .unwrap();
5590
5591 let conn = backend.get_connection().unwrap();
5593 let mut stmt = conn
5594 .prepare(
5595 "SELECT param_name, value_token_code, value_identifier_type_system, value_identifier_type_code
5596 FROM search_index
5597 WHERE tenant_id = 'test-tenant'
5598 AND resource_id = 'patient-type-test'
5599 AND param_name = 'identifier'",
5600 )
5601 .unwrap();
5602
5603 #[allow(clippy::type_complexity)]
5604 let rows: Vec<(String, Option<String>, Option<String>, Option<String>)> = stmt
5605 .query_map([], |row| {
5606 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
5607 })
5608 .unwrap()
5609 .filter_map(|r| r.ok())
5610 .collect();
5611
5612 assert!(
5614 !rows.is_empty(),
5615 "Should have indexed 'identifier' parameter for Patient"
5616 );
5617
5618 let entry = rows
5620 .iter()
5621 .find(|(_, code, _, _)| code.as_deref() == Some("MRN12345"));
5622 assert!(entry.is_some(), "Should have entry with value MRN12345");
5623
5624 let (_, _, type_system, type_code) = entry.unwrap();
5626 assert_eq!(
5627 type_system.as_deref(),
5628 Some("http://terminology.hl7.org/CodeSystem/v2-0203"),
5629 "Identifier type system should be populated"
5630 );
5631 assert_eq!(
5632 type_code.as_deref(),
5633 Some("MR"),
5634 "Identifier type code should be populated"
5635 );
5636 }
5637}