use std::sync::Arc;
use mongodb::bson::{Bson, DateTime as BsonDateTime, Document, doc};
use mongodb::{Client, Collection};
use tokio::sync::OnceCell;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use crate::backends::mongodb::backend::{MongoBackend, MongoBackendConfig, connect_client};
use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
use crate::tenant::TenantContext;
use super::compiler::compile_view_definition_mongo;
const CHANNEL_BUFFER: usize = 256;
pub struct MongoInDbRunner {
client: Arc<OnceCell<Client>>,
config: MongoBackendConfig,
}
impl MongoInDbRunner {
pub fn new(client: Arc<OnceCell<Client>>, config: MongoBackendConfig) -> Self {
Self { client, config }
}
async fn resources(&self) -> Result<Collection<Document>, SofError> {
let client = self
.client
.get_or_try_init(|| connect_client(&self.config))
.await
.map_err(|e| SofError::Storage(e.to_string()))?;
Ok(client
.database(&self.config.database_name)
.collection(MongoBackend::RESOURCES_COLLECTION))
}
}
#[async_trait::async_trait]
impl SofRunner for MongoInDbRunner {
fn runner_name(&self) -> &'static str {
"mongo-indb"
}
async fn run_view(
&self,
tenant: &TenantContext,
view_definition: serde_json::Value,
filters: ViewFilters,
) -> Result<RowStream, SofError> {
if !filters.patient.is_empty() || !filters.group.is_empty() {
return Err(SofError::Uncompilable {
reason: "patient/group filters are not yet supported by the MongoDB runner"
.to_string(),
});
}
let compiled = compile_view_definition_mongo(&view_definition, self.config.fhir_version)?;
debug!(
runner = "mongo-indb",
tenant = %tenant.tenant_id(),
"executing compiled ViewDefinition"
);
let mut tenant_match = doc! { "tenant_id": tenant.tenant_id().to_string() };
if let Some(since) = filters.since {
tenant_match.insert(
"last_updated",
doc! { "$gte": BsonDateTime::from_millis(since.timestamp_millis()) },
);
}
let mut pipeline: Vec<Document> = Vec::with_capacity(compiled.pipeline.len() + 2);
pipeline.push(doc! { "$match": tenant_match });
pipeline.extend(compiled.pipeline);
if let Some(limit) = filters.limit {
pipeline.push(doc! { "$limit": limit as i64 });
}
let collection = self.resources().await?;
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
tokio::spawn(async move {
let mut cursor = match collection.aggregate(pipeline).await {
Ok(c) => c,
Err(e) => {
let _ = tx
.send(Err(SofError::Backend(format!("aggregate failed: {e}"))))
.await;
return;
}
};
let mut count = 0usize;
loop {
match cursor.advance().await {
Ok(true) => {}
Ok(false) => break,
Err(e) => {
let _ = tx
.send(Err(SofError::Backend(format!("cursor advance: {e}"))))
.await;
return;
}
}
let doc = match cursor.deserialize_current() {
Ok(d) => d,
Err(e) => {
let _ = tx
.send(Err(SofError::Backend(format!("cursor deserialize: {e}"))))
.await;
return;
}
};
let row = match mongodb::bson::from_bson::<ViewRow>(Bson::Document(doc)) {
Ok(v) => v,
Err(e) => {
let _ = tx
.send(Err(SofError::Backend(format!("row decode: {e}"))))
.await;
return;
}
};
count += 1;
if tx.send(Ok(row)).await.is_err() {
return;
}
}
debug!(
runner = "mongo-indb",
rows = count,
"in-DB view run complete"
);
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
}