1use helios_fhir::FhirVersion;
15use r2d2::Pool;
16use r2d2_sqlite::SqliteConnectionManager;
17use rusqlite::types::ValueRef;
18use serde_json::{Map, Value};
19use tokio_stream::wrappers::ReceiverStream;
20use tracing::debug;
21
22use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
23use crate::tenant::TenantContext;
24
25use super::compiler::{SqlDialect, compile_view_definition_dialect};
26
27const CHANNEL_BUFFER: usize = 256;
29
30pub struct SqliteInDbRunner {
32 pool: Pool<SqliteConnectionManager>,
33 fhir_version: FhirVersion,
34}
35
36impl SqliteInDbRunner {
37 pub fn new(pool: Pool<SqliteConnectionManager>) -> Self {
41 Self {
42 pool,
43 fhir_version: FhirVersion::default_enabled(),
44 }
45 }
46
47 pub fn with_fhir_version(mut self, version: FhirVersion) -> Self {
50 self.fhir_version = version;
51 self
52 }
53}
54
55#[async_trait::async_trait]
56impl SofRunner for SqliteInDbRunner {
57 fn runner_name(&self) -> &'static str {
58 "sqlite-indb"
59 }
60
61 async fn run_view(
62 &self,
63 tenant: &TenantContext,
64 view_definition: Value,
65 mut filters: ViewFilters,
66 ) -> Result<RowStream, SofError> {
67 let compiled = compile_view_definition_dialect(
69 &view_definition,
70 SqlDialect::Sqlite,
71 self.fhir_version,
72 )?;
73
74 debug!(
75 runner = "sqlite-indb",
76 tenant = %tenant.tenant_id(),
77 "executing compiled ViewDefinition"
78 );
79
80 let tenant_id = tenant.tenant_id().to_string();
81 let resource_type = view_definition
82 .get("resource")
83 .and_then(|v| v.as_str())
84 .unwrap_or("")
85 .to_string();
86
87 if !filters.group.is_empty() {
93 let resolved =
94 resolve_group_refs_to_patient_refs(&self.pool, &tenant_id, &filters.group)?;
95 for p in resolved {
96 if !filters.patient.iter().any(|existing| existing == &p) {
97 filters.patient.push(p);
98 }
99 }
100 filters.group.clear();
101 }
102
103 let limit = filters.limit;
104 let columns = compiled.columns.clone();
105 let pool = self.pool.clone();
106
107 let (sql, extra_params) = build_sqlite_sql(
111 &compiled.sql,
112 &compiled.constants,
113 &filters,
114 self.fhir_version,
115 &resource_type,
116 );
117
118 let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
119
120 tokio::task::spawn_blocking(move || {
121 stream_sqlite_rows(
122 &pool,
123 &sql,
124 &tenant_id,
125 &resource_type,
126 extra_params,
127 &columns,
128 limit,
129 tx,
130 );
131 });
132
133 Ok(Box::pin(ReceiverStream::new(rx)))
134 }
135}
136
137fn resolve_group_refs_to_patient_refs(
144 pool: &Pool<SqliteConnectionManager>,
145 tenant_id: &str,
146 group_refs: &[String],
147) -> Result<Vec<String>, SofError> {
148 if group_refs.is_empty() {
149 return Ok(Vec::new());
150 }
151 let conn = pool
152 .get()
153 .map_err(|e| SofError::Storage(format!("failed to get sqlite connection: {e}")))?;
154 let mut stmt = conn
155 .prepare(
156 "SELECT data FROM resources \
157 WHERE tenant_id = ?1 \
158 AND resource_type = 'Group' \
159 AND id = ?2 \
160 AND is_deleted = 0",
161 )
162 .map_err(|e| SofError::Storage(format!("prepare failed: {e}")))?;
163
164 let mut groups = Vec::with_capacity(group_refs.len());
165 for r in group_refs {
166 let id = r.strip_prefix("Group/").unwrap_or(r);
167 let res: rusqlite::Result<Vec<u8>> = stmt.query_row([tenant_id, id], |row| row.get(0));
168 match res {
169 Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
170 Ok(v) => groups.push(v),
171 Err(_) => continue,
172 },
173 Err(rusqlite::Error::QueryReturnedNoRows) => continue,
174 Err(e) => {
175 return Err(SofError::Storage(format!(
176 "group lookup failed for {r}: {e}"
177 )));
178 }
179 }
180 }
181
182 let set = helios_sof::resolve_group_members_to_patient_refs(group_refs, &groups);
183 Ok(set.into_iter().collect())
184}
185
186fn build_sqlite_sql(
199 base_sql: &str,
200 constants: &[super::ir::LitValue],
201 filters: &ViewFilters,
202 fhir_version: FhirVersion,
203 resource_type: &str,
204) -> (String, Vec<SqliteParam>) {
205 let mut conditions: Vec<String> = Vec::new();
206 let mut extra_params: Vec<SqliteParam> = constants
207 .iter()
208 .map(SqliteParam::from_lit)
209 .collect::<Vec<_>>();
210 let mut next_param = 3usize + constants.len();
211
212 if let Some(since) = &filters.since {
213 conditions.push(format!("r.last_updated >= ?{next_param}"));
214 extra_params.push(SqliteParam::Text(since.to_rfc3339()));
216 next_param += 1;
217 }
218
219 if let Some(c) = compartment_filter_sql(
220 fhir_version,
221 "Patient",
222 resource_type,
223 &filters.patient,
224 &mut next_param,
225 &mut extra_params,
226 ) {
227 conditions.push(c);
228 }
229
230 if let Some(c) = compartment_filter_sql(
231 fhir_version,
232 "Group",
233 resource_type,
234 &filters.group,
235 &mut next_param,
236 &mut extra_params,
237 ) {
238 conditions.push(c);
239 }
240
241 if conditions.is_empty() {
242 return (base_sql.to_string(), extra_params);
243 }
244
245 let joined = conditions.join(" AND ");
246 let sql = inject_before_order_by(base_sql, &format!(" AND {joined}"));
247 (sql, extra_params)
248}
249
250fn compartment_filter_sql(
270 fhir_version: FhirVersion,
271 compartment_type: &str,
272 resource_type: &str,
273 compartment_refs: &[String],
274 next_param: &mut usize,
275 extra_params: &mut Vec<SqliteParam>,
276) -> Option<String> {
277 if compartment_refs.is_empty() {
278 return None;
279 }
280
281 let canonical_prefix = format!("{}/", compartment_type);
282
283 if resource_type == compartment_type {
285 let mut ors: Vec<String> = Vec::with_capacity(compartment_refs.len());
286 for r in compartment_refs {
287 let id = r.strip_prefix(canonical_prefix.as_str()).unwrap_or(r);
288 let p = *next_param;
289 ors.push(format!("r.id = ?{p}"));
290 extra_params.push(SqliteParam::Text(id.to_string()));
291 *next_param += 1;
292 }
293 return Some(format!("({})", ors.join(" OR ")));
294 }
295
296 let names = helios_fhir::compartment_params(fhir_version, compartment_type, resource_type);
299 if names.is_empty() {
300 return Some("1=0".to_string());
304 }
305
306 let mut name_placeholders = Vec::with_capacity(names.len());
307 for n in names {
308 let p = *next_param;
309 name_placeholders.push(format!("?{p}"));
310 extra_params.push(SqliteParam::Text((*n).to_string()));
311 *next_param += 1;
312 }
313
314 let mut ref_placeholders = Vec::with_capacity(compartment_refs.len());
315 for r in compartment_refs {
316 let canonical = if r.starts_with(canonical_prefix.as_str()) {
317 r.clone()
318 } else {
319 format!("{}{}", canonical_prefix, r)
320 };
321 let p = *next_param;
322 ref_placeholders.push(format!("?{p}"));
323 extra_params.push(SqliteParam::Text(canonical));
324 *next_param += 1;
325 }
326
327 Some(format!(
331 "EXISTS (SELECT 1 FROM search_index si \
332 WHERE si.tenant_id = ?1 \
333 AND si.resource_type = ?2 \
334 AND si.resource_id = r.id \
335 AND si.param_name IN ({}) \
336 AND si.value_reference IN ({}))",
337 name_placeholders.join(","),
338 ref_placeholders.join(",")
339 ))
340}
341
342fn inject_before_order_by(sql: &str, extra: &str) -> String {
348 let search = ["\nORDER BY", " ORDER BY"];
350 for pat in search {
351 if let Some(pos) = sql.rfind(pat) {
352 let mut s = sql.to_string();
353 s.insert_str(pos, extra);
354 return s;
355 }
356 }
357 format!("{sql}{extra}")
358}
359
360#[derive(Clone, Debug)]
367enum SqliteParam {
368 Text(String),
369 Bool(bool),
370 Int(i64),
371 Decimal(String),
374 Null,
375}
376
377impl SqliteParam {
378 fn from_lit(v: &super::ir::LitValue) -> Self {
379 match v {
380 super::ir::LitValue::Null => SqliteParam::Null,
381 super::ir::LitValue::Bool(b) => SqliteParam::Bool(*b),
382 super::ir::LitValue::Int(n) => SqliteParam::Int(*n),
383 super::ir::LitValue::Decimal(s) => SqliteParam::Decimal(s.clone()),
384 super::ir::LitValue::Str(s) => SqliteParam::Text(s.clone()),
385 }
386 }
387}
388
389impl rusqlite::ToSql for SqliteParam {
390 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
391 use rusqlite::types::{ToSqlOutput, Value};
392 Ok(match self {
393 SqliteParam::Text(s) => ToSqlOutput::Borrowed(s.as_str().into()),
394 SqliteParam::Bool(b) => ToSqlOutput::Owned(Value::Integer(if *b { 1 } else { 0 })),
395 SqliteParam::Int(n) => ToSqlOutput::Owned(Value::Integer(*n)),
396 SqliteParam::Decimal(s) => match s.parse::<f64>() {
402 Ok(n) => ToSqlOutput::Owned(Value::Real(n)),
403 Err(_) => ToSqlOutput::Owned(Value::Text(s.clone())),
404 },
405 SqliteParam::Null => ToSqlOutput::Owned(Value::Null),
406 })
407 }
408}
409
410#[allow(clippy::too_many_arguments)]
415fn stream_sqlite_rows(
416 pool: &Pool<SqliteConnectionManager>,
417 sql: &str,
418 tenant_id: &str,
419 resource_type: &str,
420 extra_params: Vec<SqliteParam>,
421 columns: &[String],
422 limit: Option<usize>,
423 tx: tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
424) {
425 let conn = match pool.get() {
426 Ok(c) => c,
427 Err(e) => {
428 let _ = tx.blocking_send(Err(SofError::Storage(format!(
429 "failed to acquire SQLite connection: {e}"
430 ))));
431 return;
432 }
433 };
434
435 let mut stmt = match conn.prepare(sql) {
436 Ok(s) => s,
437 Err(e) => {
438 let _ = tx.blocking_send(Err(SofError::Backend(format!(
439 "failed to prepare SQL: {e}"
440 ))));
441 return;
442 }
443 };
444
445 let mut all_params: Vec<SqliteParam> = Vec::with_capacity(2 + extra_params.len());
448 all_params.push(SqliteParam::Text(tenant_id.to_string()));
449 all_params.push(SqliteParam::Text(resource_type.to_string()));
450 all_params.extend(extra_params);
451
452 let row_iter = {
453 match stmt.query_map(rusqlite::params_from_iter(all_params.iter()), |row| {
454 map_sqlite_row(row, columns)
455 }) {
456 Ok(iter) => iter,
457 Err(e) => {
458 let _ = tx.blocking_send(Err(SofError::Backend(format!(
459 "query execution failed: {e}"
460 ))));
461 return;
462 }
463 }
464 };
465
466 let mut count = 0usize;
467 for row_result in row_iter {
468 if let Some(cap) = limit {
469 if count >= cap {
470 break;
471 }
472 }
473 count += 1;
474
475 let row = match row_result {
476 Ok(map) => Ok(Value::Object(map)),
477 Err(e) => Err(SofError::Backend(format!("row error: {e}"))),
478 };
479
480 if tx.blocking_send(row).is_err() {
481 break;
483 }
484 }
485
486 debug!(
487 runner = "sqlite-indb",
488 rows = count,
489 "in-DB view run complete"
490 );
491 }
493
494fn map_sqlite_row(
495 row: &rusqlite::Row<'_>,
496 columns: &[String],
497) -> rusqlite::Result<Map<String, Value>> {
498 let mut map = Map::new();
499 for (i, name) in columns.iter().enumerate() {
500 let val = match row.get_ref(i)? {
501 ValueRef::Null => Value::Null,
502 ValueRef::Integer(n) => Value::from(n),
503 ValueRef::Real(f) => {
504 Value::from(serde_json::Number::from_f64(f).unwrap_or(serde_json::Number::from(0)))
505 }
506 ValueRef::Text(b) => {
507 let s = String::from_utf8_lossy(b).into_owned();
508 serde_json::from_str(&s).unwrap_or(Value::String(s))
509 }
510 ValueRef::Blob(b) => {
511 let s = String::from_utf8_lossy(b).into_owned();
512 serde_json::from_str(&s).unwrap_or(Value::String(s))
513 }
514 };
515 if val != Value::Null {
516 map.insert(name.clone(), val);
517 }
518 }
519 Ok(map)
520}