Skip to main content

helios_persistence/sof/
mongodb.rs

1//! MongoDB in-DB SQL-on-FHIR runner.
2//!
3//! [`MongoInDbRunner`] compiles a ViewDefinition to a MongoDB aggregation
4//! pipeline (via [`compile_view_definition_mongo`](super::compiler::compile_view_definition_mongo))
5//! and executes it directly against the `resources` collection, bypassing
6//! in-process FHIRPath evaluation. Resources are stored as native BSON
7//! sub-documents under `data`, so the emitted pipeline navigates them with
8//! `$getField`/dotted paths.
9//!
10//! The runtime filters that depend on the request (tenant, `_since`, row limit)
11//! are layered onto the compiled pipeline here: a leading `$match` constrains
12//! the tenant (and `last_updated` for `since`), and a trailing `$limit` caps the
13//! output.
14
15use std::sync::Arc;
16
17use mongodb::bson::{Bson, DateTime as BsonDateTime, Document, doc};
18use mongodb::{Client, Collection};
19use tokio::sync::OnceCell;
20use tokio_stream::wrappers::ReceiverStream;
21use tracing::debug;
22
23use crate::backends::mongodb::backend::{MongoBackend, MongoBackendConfig, connect_client};
24use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
25use crate::tenant::TenantContext;
26
27use super::compiler::compile_view_definition_mongo;
28
29/// Channel buffer depth (rows that can be queued ahead of the consumer).
30const CHANNEL_BUFFER: usize = 256;
31
32/// SQL-on-FHIR runner that compiles ViewDefinitions to MongoDB aggregation
33/// pipelines.
34pub struct MongoInDbRunner {
35    /// Shared (lazily initialised) client cell — the same pool the backend uses.
36    client: Arc<OnceCell<Client>>,
37    config: MongoBackendConfig,
38}
39
40impl MongoInDbRunner {
41    /// Creates a runner sharing the backend's client cell and configuration.
42    pub fn new(client: Arc<OnceCell<Client>>, config: MongoBackendConfig) -> Self {
43        Self { client, config }
44    }
45
46    /// Resolves the `resources` collection, initialising the shared client on
47    /// first use.
48    async fn resources(&self) -> Result<Collection<Document>, SofError> {
49        let client = self
50            .client
51            .get_or_try_init(|| connect_client(&self.config))
52            .await
53            .map_err(|e| SofError::Storage(e.to_string()))?;
54        Ok(client
55            .database(&self.config.database_name)
56            .collection(MongoBackend::RESOURCES_COLLECTION))
57    }
58}
59
60#[async_trait::async_trait]
61impl SofRunner for MongoInDbRunner {
62    fn runner_name(&self) -> &'static str {
63        "mongo-indb"
64    }
65
66    async fn run_view(
67        &self,
68        tenant: &TenantContext,
69        view_definition: serde_json::Value,
70        filters: ViewFilters,
71    ) -> Result<RowStream, SofError> {
72        if !filters.patient.is_empty() || !filters.group.is_empty() {
73            // Compartment filtering for the Mongo runner lands in a later stage.
74            return Err(SofError::Uncompilable {
75                reason: "patient/group filters are not yet supported by the MongoDB runner"
76                    .to_string(),
77            });
78        }
79
80        let compiled = compile_view_definition_mongo(&view_definition, self.config.fhir_version)?;
81
82        debug!(
83            runner = "mongo-indb",
84            tenant = %tenant.tenant_id(),
85            "executing compiled ViewDefinition"
86        );
87
88        // Leading tenant (and optional `since`) predicate, prepended to the
89        // compiled pipeline. Kept first so the `tenant_id + resource_type +
90        // is_deleted` index can serve the scan.
91        let mut tenant_match = doc! { "tenant_id": tenant.tenant_id().to_string() };
92        if let Some(since) = filters.since {
93            tenant_match.insert(
94                "last_updated",
95                doc! { "$gte": BsonDateTime::from_millis(since.timestamp_millis()) },
96            );
97        }
98
99        let mut pipeline: Vec<Document> = Vec::with_capacity(compiled.pipeline.len() + 2);
100        pipeline.push(doc! { "$match": tenant_match });
101        pipeline.extend(compiled.pipeline);
102        if let Some(limit) = filters.limit {
103            pipeline.push(doc! { "$limit": limit as i64 });
104        }
105
106        let collection = self.resources().await?;
107        let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
108
109        tokio::spawn(async move {
110            let mut cursor = match collection.aggregate(pipeline).await {
111                Ok(c) => c,
112                Err(e) => {
113                    let _ = tx
114                        .send(Err(SofError::Backend(format!("aggregate failed: {e}"))))
115                        .await;
116                    return;
117                }
118            };
119
120            let mut count = 0usize;
121            loop {
122                match cursor.advance().await {
123                    Ok(true) => {}
124                    Ok(false) => break,
125                    Err(e) => {
126                        let _ = tx
127                            .send(Err(SofError::Backend(format!("cursor advance: {e}"))))
128                            .await;
129                        return;
130                    }
131                }
132
133                let doc = match cursor.deserialize_current() {
134                    Ok(d) => d,
135                    Err(e) => {
136                        let _ = tx
137                            .send(Err(SofError::Backend(format!("cursor deserialize: {e}"))))
138                            .await;
139                        return;
140                    }
141                };
142
143                let row = match mongodb::bson::from_bson::<ViewRow>(Bson::Document(doc)) {
144                    Ok(v) => v,
145                    Err(e) => {
146                        let _ = tx
147                            .send(Err(SofError::Backend(format!("row decode: {e}"))))
148                            .await;
149                        return;
150                    }
151                };
152
153                count += 1;
154                if tx.send(Ok(row)).await.is_err() {
155                    // Receiver dropped (client disconnected) — stop iterating.
156                    return;
157                }
158            }
159
160            debug!(
161                runner = "mongo-indb",
162                rows = count,
163                "in-DB view run complete"
164            );
165        });
166
167        Ok(Box::pin(ReceiverStream::new(rx)))
168    }
169}