helios_persistence/sof/
mongodb.rs1use std::sync::Arc;
16
17use mongodb::bson::{Bson, DateTime as BsonDateTime, Document, doc};
18use mongodb::{Client, Collection};
19use tokio::sync::OnceCell;
20use tokio_stream::wrappers::ReceiverStream;
21use tracing::debug;
22
23use crate::backends::mongodb::backend::{MongoBackend, MongoBackendConfig, connect_client};
24use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
25use crate::tenant::TenantContext;
26
27use super::compiler::compile_view_definition_mongo;
28
29const CHANNEL_BUFFER: usize = 256;
31
32pub struct MongoInDbRunner {
35 client: Arc<OnceCell<Client>>,
37 config: MongoBackendConfig,
38}
39
40impl MongoInDbRunner {
41 pub fn new(client: Arc<OnceCell<Client>>, config: MongoBackendConfig) -> Self {
43 Self { client, config }
44 }
45
46 async fn resources(&self) -> Result<Collection<Document>, SofError> {
49 let client = self
50 .client
51 .get_or_try_init(|| connect_client(&self.config))
52 .await
53 .map_err(|e| SofError::Storage(e.to_string()))?;
54 Ok(client
55 .database(&self.config.database_name)
56 .collection(MongoBackend::RESOURCES_COLLECTION))
57 }
58}
59
60#[async_trait::async_trait]
61impl SofRunner for MongoInDbRunner {
62 fn runner_name(&self) -> &'static str {
63 "mongo-indb"
64 }
65
66 async fn run_view(
67 &self,
68 tenant: &TenantContext,
69 view_definition: serde_json::Value,
70 filters: ViewFilters,
71 ) -> Result<RowStream, SofError> {
72 if !filters.patient.is_empty() || !filters.group.is_empty() {
73 return Err(SofError::Uncompilable {
75 reason: "patient/group filters are not yet supported by the MongoDB runner"
76 .to_string(),
77 });
78 }
79
80 let compiled = compile_view_definition_mongo(&view_definition, self.config.fhir_version)?;
81
82 debug!(
83 runner = "mongo-indb",
84 tenant = %tenant.tenant_id(),
85 "executing compiled ViewDefinition"
86 );
87
88 let mut tenant_match = doc! { "tenant_id": tenant.tenant_id().to_string() };
92 if let Some(since) = filters.since {
93 tenant_match.insert(
94 "last_updated",
95 doc! { "$gte": BsonDateTime::from_millis(since.timestamp_millis()) },
96 );
97 }
98
99 let mut pipeline: Vec<Document> = Vec::with_capacity(compiled.pipeline.len() + 2);
100 pipeline.push(doc! { "$match": tenant_match });
101 pipeline.extend(compiled.pipeline);
102 if let Some(limit) = filters.limit {
103 pipeline.push(doc! { "$limit": limit as i64 });
104 }
105
106 let collection = self.resources().await?;
107 let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
108
109 tokio::spawn(async move {
110 let mut cursor = match collection.aggregate(pipeline).await {
111 Ok(c) => c,
112 Err(e) => {
113 let _ = tx
114 .send(Err(SofError::Backend(format!("aggregate failed: {e}"))))
115 .await;
116 return;
117 }
118 };
119
120 let mut count = 0usize;
121 loop {
122 match cursor.advance().await {
123 Ok(true) => {}
124 Ok(false) => break,
125 Err(e) => {
126 let _ = tx
127 .send(Err(SofError::Backend(format!("cursor advance: {e}"))))
128 .await;
129 return;
130 }
131 }
132
133 let doc = match cursor.deserialize_current() {
134 Ok(d) => d,
135 Err(e) => {
136 let _ = tx
137 .send(Err(SofError::Backend(format!("cursor deserialize: {e}"))))
138 .await;
139 return;
140 }
141 };
142
143 let row = match mongodb::bson::from_bson::<ViewRow>(Bson::Document(doc)) {
144 Ok(v) => v,
145 Err(e) => {
146 let _ = tx
147 .send(Err(SofError::Backend(format!("row decode: {e}"))))
148 .await;
149 return;
150 }
151 };
152
153 count += 1;
154 if tx.send(Ok(row)).await.is_err() {
155 return;
157 }
158 }
159
160 debug!(
161 runner = "mongo-indb",
162 rows = count,
163 "in-DB view run complete"
164 );
165 });
166
167 Ok(Box::pin(ReceiverStream::new(rx)))
168 }
169}