1use anyhow::Context;
2use mesh_proto::data_plane_client::DataPlaneClient as TonicDataPlaneClient;
3use mesh_proto::{EncryptedChunk, HandshakeRequest, ResultRequest};
4use tonic::transport::Channel;
5use tonic::Request;
6use async_stream::stream;
7
8pub struct DataPlaneConsumerClient {
9 inner: TonicDataPlaneClient<Channel>,
10}
11
12impl DataPlaneConsumerClient {
13 pub async fn connect(endpoint: &str) -> Result<Self, anyhow::Error> {
14 let inner = TonicDataPlaneClient::connect(endpoint.to_string()).await.context("connect grpc")?;
15 Ok(Self { inner })
16 }
17
18 pub async fn call(&mut self, session_id: &str, session_token: &str, consumer_did: &str, payload: &[u8]) -> Result<Vec<u8>, anyhow::Error> {
20 let hs = HandshakeRequest {
22 session_id: session_id.to_string(),
23 session_token: session_token.to_string(),
24 consumer_ephemeral_pub: Vec::new(),
25 consumer_did: consumer_did.to_string(),
26 consumer_did_signature: Vec::new(),
27 };
28 let _ = self.inner.handshake(Request::new(hs)).await.context("handshake failed")?;
29
30 let sid = session_id.to_string();
32 let payload_buf = payload.to_vec();
33
34 let outbound = stream! {
36 let meta = serde_json::json!({"session_id": sid});
38 let meta_chunk = EncryptedChunk { ciphertext: meta.to_string().into_bytes(), nonce: Vec::new(), sequence: 0, is_final: false, algorithm: "none".to_string() };
39 yield meta_chunk;
40 let chunk = EncryptedChunk { ciphertext: payload_buf.clone(), nonce: Vec::new(), sequence: 1, is_final: true, algorithm: "none".to_string() };
41 yield chunk;
42 };
43
44 let _ack = self.inner.transfer(Request::new(outbound)).await.context("transfer failed")?;
45
46 let mut stream = self.inner.result(Request::new(ResultRequest { session_id: session_id.to_string() })).await?.into_inner();
48 let mut result = Vec::new();
49 while let Some(chunk) = stream.message().await.context("result stream error")? {
50 result.extend_from_slice(&chunk.ciphertext);
51 if chunk.is_final { break; }
52 }
53 Ok(result)
54 }
55}