vegafusion_runtime/task_graph/
grpc_runtime.rs1use vegafusion_core::{
2 data::dataset::VegaFusionDataset,
3 proto::gen::{
4 services::{
5 query_request, query_result, vega_fusion_runtime_client::VegaFusionRuntimeClient,
6 QueryRequest,
7 },
8 tasks::{NodeValueIndex, TaskGraph, TaskGraphValueRequest},
9 },
10 runtime::VegaFusionRuntimeTrait,
11 task_graph::task_value::NamedTaskValue,
12};
13
14use crate::task_graph::runtime::encode_inline_datasets;
15use async_mutex::Mutex;
16use async_trait::async_trait;
17use std::collections::HashMap;
18use std::{any::Any, sync::Arc};
19use vegafusion_common::error::{Result, VegaFusionError};
20
21#[derive(Clone)]
22pub struct GrpcVegaFusionRuntime {
23 client: Arc<Mutex<VegaFusionRuntimeClient<tonic::transport::Channel>>>,
24}
25
26#[async_trait]
27impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
28 fn as_any(&self) -> &dyn Any {
29 self
30 }
31
32 async fn query_request(
33 &self,
34 task_graph: Arc<TaskGraph>,
35 indices: &[NodeValueIndex],
36 inline_datasets: &HashMap<String, VegaFusionDataset>,
37 ) -> Result<Vec<NamedTaskValue>> {
38 let inline_datasets = encode_inline_datasets(inline_datasets)?;
39 let request = QueryRequest {
40 request: Some(query_request::Request::TaskGraphValues(
41 TaskGraphValueRequest {
42 task_graph: Some(task_graph.as_ref().clone()),
43 indices: indices.to_vec(),
44 inline_datasets,
45 },
46 )),
47 };
48
49 let mut locked_client = self.client.lock().await;
50 let response = locked_client
51 .task_graph_query(request)
52 .await
53 .map_err(|e| VegaFusionError::internal(e.to_string()))?;
54 match response.into_inner().response.unwrap() {
55 query_result::Response::TaskGraphValues(task_graph_values) => Ok(task_graph_values
56 .response_values
57 .into_iter()
58 .map(|v| v.into())
59 .collect::<Vec<_>>()),
60 _ => Err(VegaFusionError::internal(
61 "Invalid response type".to_string(),
62 )),
63 }
64 }
65}
66
67impl GrpcVegaFusionRuntime {
68 pub async fn try_new(channel: tonic::transport::Channel) -> Result<Self> {
69 let client = VegaFusionRuntimeClient::new(channel);
70 Ok(Self {
71 client: Arc::new(Mutex::new(client)),
72 })
73 }
74}