vegafusion_runtime/task_graph/
grpc_runtime.rs

1use vegafusion_core::{
2    data::dataset::VegaFusionDataset,
3    proto::gen::{
4        services::{
5            query_request, query_result, vega_fusion_runtime_client::VegaFusionRuntimeClient,
6            QueryRequest,
7        },
8        tasks::{NodeValueIndex, TaskGraph, TaskGraphValueRequest},
9    },
10    runtime::VegaFusionRuntimeTrait,
11    task_graph::task_value::NamedTaskValue,
12};
13
14use crate::task_graph::runtime::encode_inline_datasets;
15use async_mutex::Mutex;
16use async_trait::async_trait;
17use std::collections::HashMap;
18use std::{any::Any, sync::Arc};
19use vegafusion_common::error::{Result, VegaFusionError};
20
21#[derive(Clone)]
22pub struct GrpcVegaFusionRuntime {
23    client: Arc<Mutex<VegaFusionRuntimeClient<tonic::transport::Channel>>>,
24}
25
26#[async_trait]
27impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
28    fn as_any(&self) -> &dyn Any {
29        self
30    }
31
32    async fn query_request(
33        &self,
34        task_graph: Arc<TaskGraph>,
35        indices: &[NodeValueIndex],
36        inline_datasets: &HashMap<String, VegaFusionDataset>,
37    ) -> Result<Vec<NamedTaskValue>> {
38        let inline_datasets = encode_inline_datasets(inline_datasets)?;
39        let request = QueryRequest {
40            request: Some(query_request::Request::TaskGraphValues(
41                TaskGraphValueRequest {
42                    task_graph: Some(task_graph.as_ref().clone()),
43                    indices: indices.to_vec(),
44                    inline_datasets,
45                },
46            )),
47        };
48
49        let mut locked_client = self.client.lock().await;
50        let response = locked_client
51            .task_graph_query(request)
52            .await
53            .map_err(|e| VegaFusionError::internal(e.to_string()))?;
54        match response.into_inner().response.unwrap() {
55            query_result::Response::TaskGraphValues(task_graph_values) => Ok(task_graph_values
56                .response_values
57                .into_iter()
58                .map(|v| v.into())
59                .collect::<Vec<_>>()),
60            _ => Err(VegaFusionError::internal(
61                "Invalid response type".to_string(),
62            )),
63        }
64    }
65}
66
67impl GrpcVegaFusionRuntime {
68    pub async fn try_new(channel: tonic::transport::Channel) -> Result<Self> {
69        let client = VegaFusionRuntimeClient::new(channel);
70        Ok(Self {
71            client: Arc::new(Mutex::new(client)),
72        })
73    }
74}