1use deadpool_postgres::Pool;
16use futures::StreamExt as _;
17use helios_fhir::FhirVersion;
18use serde_json::{Map, Value};
19use tokio_stream::wrappers::ReceiverStream;
20use tracing::debug;
21
22use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
23use crate::tenant::TenantContext;
24
25use super::compiler::{SqlDialect, compile_view_definition_dialect};
26
27const CHANNEL_BUFFER: usize = 256;
29
30pub struct PgInDbRunner {
32 pool: Pool,
33 fhir_version: FhirVersion,
34}
35
36impl PgInDbRunner {
37 pub fn new(pool: Pool) -> Self {
41 Self {
42 pool,
43 fhir_version: FhirVersion::default_enabled(),
44 }
45 }
46
47 pub fn with_fhir_version(mut self, version: FhirVersion) -> Self {
50 self.fhir_version = version;
51 self
52 }
53}
54
55#[async_trait::async_trait]
56impl SofRunner for PgInDbRunner {
57 fn runner_name(&self) -> &'static str {
58 "postgres-indb"
59 }
60
61 async fn run_view(
62 &self,
63 tenant: &TenantContext,
64 view_definition: Value,
65 mut filters: ViewFilters,
66 ) -> Result<RowStream, SofError> {
67 let compiled = compile_view_definition_dialect(
69 &view_definition,
70 SqlDialect::Postgres,
71 self.fhir_version,
72 )?;
73
74 debug!(
75 runner = "postgres-indb",
76 tenant = %tenant.tenant_id(),
77 "executing compiled ViewDefinition"
78 );
79
80 let tenant_id = tenant.tenant_id().to_string();
81 let resource_type = view_definition
82 .get("resource")
83 .and_then(|v| v.as_str())
84 .unwrap_or("")
85 .to_string();
86
87 if !filters.group.is_empty() {
91 let resolved =
92 resolve_group_refs_to_patient_refs(&self.pool, &tenant_id, &filters.group).await?;
93 for p in resolved {
94 if !filters.patient.iter().any(|existing| existing == &p) {
95 filters.patient.push(p);
96 }
97 }
98 filters.group.clear();
99 }
100
101 let limit = filters.limit;
102 let columns = compiled.columns.clone();
103 let pool = self.pool.clone();
104
105 let (sql, params) = build_pg_sql_and_params(
109 &compiled.sql,
110 tenant_id,
111 resource_type,
112 &compiled.constants,
113 &filters,
114 self.fhir_version,
115 );
116
117 let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
118
119 tokio::spawn(async move {
120 stream_pg_rows(pool, sql, params, columns, limit, tx).await;
121 });
122
123 Ok(Box::pin(ReceiverStream::new(rx)))
124 }
125}
126
127async fn resolve_group_refs_to_patient_refs(
134 pool: &Pool,
135 tenant_id: &str,
136 group_refs: &[String],
137) -> Result<Vec<String>, SofError> {
138 if group_refs.is_empty() {
139 return Ok(Vec::new());
140 }
141 let client = pool
142 .get()
143 .await
144 .map_err(|e| SofError::Storage(format!("failed to get pg connection: {e}")))?;
145 let stmt = client
146 .prepare(
147 "SELECT data FROM resources \
148 WHERE tenant_id = $1 \
149 AND resource_type = 'Group' \
150 AND id = $2 \
151 AND is_deleted = false",
152 )
153 .await
154 .map_err(|e| SofError::Storage(format!("prepare failed: {e}")))?;
155
156 let mut groups = Vec::with_capacity(group_refs.len());
157 for r in group_refs {
158 let id = r.strip_prefix("Group/").unwrap_or(r);
159 match client.query_opt(&stmt, &[&tenant_id, &id]).await {
160 Ok(Some(row)) => {
161 let data: Value = row.get(0);
162 groups.push(data);
163 }
164 Ok(None) => continue,
165 Err(e) => {
166 return Err(SofError::Storage(format!(
167 "group lookup failed for {r}: {e}"
168 )));
169 }
170 }
171 }
172
173 let set = helios_sof::resolve_group_members_to_patient_refs(group_refs, &groups);
174 Ok(set.into_iter().collect())
175}
176
177fn build_pg_sql_and_params(
186 base_sql: &str,
187 tenant_id: String,
188 resource_type: String,
189 constants: &[super::ir::LitValue],
190 filters: &ViewFilters,
191 fhir_version: FhirVersion,
192) -> (String, Vec<PgParam>) {
193 let mut conditions: Vec<String> = Vec::new();
194 let mut extra: Vec<PgParam> = Vec::new();
195 let mut constant_params: Vec<PgParam> = Vec::with_capacity(constants.len());
198 for c in constants {
199 constant_params.push(PgParam::from_lit(c));
200 }
201 let mut next_param = 3usize + constants.len();
202
203 if let Some(since) = filters.since {
204 conditions.push(format!("r.last_updated >= ${next_param}"));
205 extra.push(PgParam::Timestamp(since));
206 next_param += 1;
207 }
208
209 if let Some(c) = compartment_filter_sql(
210 fhir_version,
211 "Patient",
212 &resource_type,
213 &filters.patient,
214 &mut next_param,
215 &mut extra,
216 ) {
217 conditions.push(c);
218 }
219
220 if let Some(c) = compartment_filter_sql(
221 fhir_version,
222 "Group",
223 &resource_type,
224 &filters.group,
225 &mut next_param,
226 &mut extra,
227 ) {
228 conditions.push(c);
229 }
230
231 let sql = if conditions.is_empty() {
232 base_sql.to_string()
233 } else {
234 let joined = conditions.join(" AND ");
235 inject_before_order_by(base_sql, &format!(" AND {joined}"))
236 };
237
238 let mut all_params = vec![PgParam::Text(tenant_id), PgParam::Text(resource_type)];
239 all_params.extend(constant_params);
240 all_params.extend(extra);
241
242 (sql, all_params)
243}
244
245fn compartment_filter_sql(
254 fhir_version: FhirVersion,
255 compartment_type: &str,
256 resource_type: &str,
257 compartment_refs: &[String],
258 next_param: &mut usize,
259 extra_params: &mut Vec<PgParam>,
260) -> Option<String> {
261 if compartment_refs.is_empty() {
262 return None;
263 }
264
265 let canonical_prefix = format!("{}/", compartment_type);
266
267 if resource_type == compartment_type {
269 let mut ors: Vec<String> = Vec::with_capacity(compartment_refs.len());
270 for r in compartment_refs {
271 let id = r.strip_prefix(canonical_prefix.as_str()).unwrap_or(r);
272 let p = *next_param;
273 ors.push(format!("r.id = ${p}"));
274 extra_params.push(PgParam::Text(id.to_string()));
275 *next_param += 1;
276 }
277 return Some(format!("({})", ors.join(" OR ")));
278 }
279
280 let names = helios_fhir::compartment_params(fhir_version, compartment_type, resource_type);
283 if names.is_empty() {
284 return Some("1=0".to_string());
285 }
286
287 let mut name_placeholders = Vec::with_capacity(names.len());
288 for n in names {
289 let p = *next_param;
290 name_placeholders.push(format!("${p}"));
291 extra_params.push(PgParam::Text((*n).to_string()));
292 *next_param += 1;
293 }
294
295 let mut ref_placeholders = Vec::with_capacity(compartment_refs.len());
296 for r in compartment_refs {
297 let canonical = if r.starts_with(canonical_prefix.as_str()) {
298 r.clone()
299 } else {
300 format!("{}{}", canonical_prefix, r)
301 };
302 let p = *next_param;
303 ref_placeholders.push(format!("${p}"));
304 extra_params.push(PgParam::Text(canonical));
305 *next_param += 1;
306 }
307
308 Some(format!(
312 "EXISTS (SELECT 1 FROM search_index si \
313 WHERE si.tenant_id = $1 \
314 AND si.resource_type = $2 \
315 AND si.resource_id = r.id \
316 AND si.param_name IN ({}) \
317 AND si.value_reference IN ({}))",
318 name_placeholders.join(","),
319 ref_placeholders.join(",")
320 ))
321}
322
323fn inject_before_order_by(sql: &str, extra: &str) -> String {
328 let search = ["\nORDER BY", " ORDER BY"];
329 for pat in search {
330 if let Some(pos) = sql.rfind(pat) {
331 let mut s = sql.to_string();
332 s.insert_str(pos, extra);
333 return s;
334 }
335 }
336 format!("{sql}{extra}")
337}
338
339#[derive(Clone)]
345enum PgParam {
346 Text(String),
347 Bool(bool),
348 Int(i64),
349 Decimal(String),
350 Null,
351 Timestamp(chrono::DateTime<chrono::Utc>),
352}
353
354impl PgParam {
355 fn from_lit(v: &super::ir::LitValue) -> Self {
359 match v {
360 super::ir::LitValue::Null => PgParam::Null,
361 super::ir::LitValue::Bool(b) => PgParam::Bool(*b),
362 super::ir::LitValue::Int(n) => PgParam::Int(*n),
363 super::ir::LitValue::Decimal(s) => PgParam::Decimal(s.clone()),
364 super::ir::LitValue::Str(s) => PgParam::Text(s.clone()),
365 }
366 }
367}
368
369async fn stream_pg_rows(
374 pool: Pool,
375 sql: String,
376 params: Vec<PgParam>,
377 columns: Vec<String>,
378 limit: Option<usize>,
379 tx: tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
380) {
381 if let Err(e) = stream_pg_rows_inner(pool, sql, params, columns, limit, &tx).await {
382 let _ = tx.send(Err(e)).await;
383 }
384}
385
386async fn stream_pg_rows_inner(
387 pool: Pool,
388 sql: String,
389 params: Vec<PgParam>,
390 columns: Vec<String>,
391 limit: Option<usize>,
392 tx: &tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
393) -> Result<(), SofError> {
394 let client = pool
395 .get()
396 .await
397 .map_err(|e| SofError::Storage(format!("failed to acquire Postgres connection: {e}")))?;
398
399 if std::env::var("PG_SOF_DEBUG_ALL").is_ok() {
400 eprintln!("[PG_SOF_DEBUG_ALL] preparing\n--- SQL ---\n{sql}\n---");
401 }
402 let stmt = client.prepare(&sql).await.map_err(|e| {
403 if std::env::var("PG_SOF_DEBUG").is_ok() {
404 eprintln!("[PG_SOF_DEBUG] prepare failed: {e}\n--- SQL ---\n{sql}\n---");
405 }
406 SofError::Backend(format!("failed to prepare SQL: {e}"))
407 })?;
408
409 let boxed: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = params
411 .into_iter()
412 .map(|p| -> Box<dyn tokio_postgres::types::ToSql + Sync + Send> {
413 match p {
414 PgParam::Text(s) => Box::new(s),
415 PgParam::Bool(b) => Box::new(if b {
421 "true".to_string()
422 } else {
423 "false".to_string()
424 }),
425 PgParam::Int(n) => Box::new(n.to_string()),
426 PgParam::Decimal(s) => Box::new(s),
427 PgParam::Null => Box::new(None::<String>),
428 PgParam::Timestamp(dt) => Box::new(dt),
429 }
430 })
431 .collect();
432
433 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = boxed
437 .iter()
438 .map(|b| b.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
439 .collect();
440
441 let raw = client
442 .query_raw(&stmt, param_refs.iter().copied())
443 .await
444 .map_err(|e| {
445 if std::env::var("PG_SOF_DEBUG").is_ok() {
446 eprintln!("[PG_SOF_DEBUG] query failed: {e}\n--- SQL ---\n{sql}\n---");
447 }
448 SofError::Backend(format!("query execution failed: {e}"))
449 })?;
450
451 drop(param_refs);
453 drop(boxed);
454
455 futures::pin_mut!(raw);
456
457 let mut count = 0usize;
458 while let Some(row_result) = raw.next().await {
459 match row_result {
460 Ok(pg_row) => {
461 if let Some(cap) = limit {
462 if count >= cap {
463 break;
464 }
465 }
466 count += 1;
467 match row_to_json(&pg_row, &columns) {
468 Ok(row) => {
469 if tx.send(Ok(row)).await.is_err() {
470 break; }
472 }
473 Err(e) => {
474 let _ = tx.send(Err(e)).await;
475 break;
476 }
477 }
478 }
479 Err(e) => {
480 if std::env::var("PG_SOF_DEBUG").is_ok() {
481 eprintln!("[PG_SOF_DEBUG] row error: {e}\n--- SQL ---\n{sql}\n---");
482 }
483 let _ = tx
484 .send(Err(SofError::Backend(format!("row error: {e}"))))
485 .await;
486 break;
487 }
488 }
489 }
490
491 debug!(
492 runner = "postgres-indb",
493 rows = count,
494 "in-DB view run complete"
495 );
496 Ok(())
497 }
499
500fn row_to_json(pg_row: &tokio_postgres::Row, columns: &[String]) -> Result<ViewRow, SofError> {
508 let mut map = Map::new();
509 for (i, name) in columns.iter().enumerate() {
510 let val: Option<String> = pg_row
511 .try_get(i)
512 .map_err(|e| SofError::Backend(format!("failed to read column '{name}': {e}")))?;
513
514 if let Some(s) = val {
515 let json_val = serde_json::from_str(&s).unwrap_or(Value::String(s));
516 map.insert(name.clone(), json_val);
517 }
518 }
519 Ok(Value::Object(map))
520}