Skip to main content

mesh_dataplane/
client.rs

1use anyhow::Context;
2use mesh_proto::data_plane_client::DataPlaneClient as TonicDataPlaneClient;
3use mesh_proto::{EncryptedChunk, HandshakeRequest, ResultRequest};
4use tonic::transport::Channel;
5use tonic::Request;
6use async_stream::stream;
7
8pub struct DataPlaneConsumerClient {
9    inner: TonicDataPlaneClient<Channel>,
10}
11
12impl DataPlaneConsumerClient {
13    pub async fn connect(endpoint: &str) -> Result<Self, anyhow::Error> {
14        let inner = TonicDataPlaneClient::connect(endpoint.to_string()).await.context("connect grpc")?;
15        Ok(Self { inner })
16    }
17
18    /// Performs handshake, transfer and result sequence. Returns result bytes.
19    pub async fn call(&mut self, session_id: &str, session_token: &str, consumer_did: &str, payload: &[u8]) -> Result<Vec<u8>, anyhow::Error> {
20        // Handshake
21        let hs = HandshakeRequest {
22            session_id: session_id.to_string(),
23            session_token: session_token.to_string(),
24            consumer_ephemeral_pub: Vec::new(),
25            consumer_did: consumer_did.to_string(),
26            consumer_did_signature: Vec::new(),
27        };
28        let _ = self.inner.handshake(Request::new(hs)).await.context("handshake failed")?;
29
30        // make owned copies so the stream is 'static
31        let sid = session_id.to_string();
32        let payload_buf = payload.to_vec();
33
34        // Transfer: stream one chunk with sequence 1 and cipher = payload
35        let outbound = stream! {
36            // first chunk include session_id as sequence 0 metadata
37            let meta = serde_json::json!({"session_id": sid});
38            let meta_chunk = EncryptedChunk { ciphertext: meta.to_string().into_bytes(), nonce: Vec::new(), sequence: 0, is_final: false, algorithm: "none".to_string() };
39            yield meta_chunk;
40            let chunk = EncryptedChunk { ciphertext: payload_buf.clone(), nonce: Vec::new(), sequence: 1, is_final: true, algorithm: "none".to_string() };
41            yield chunk;
42        };
43
44        let _ack = self.inner.transfer(Request::new(outbound)).await.context("transfer failed")?;
45
46        // Result: streaming response
47        let mut stream = self.inner.result(Request::new(ResultRequest { session_id: session_id.to_string() })).await?.into_inner();
48        let mut result = Vec::new();
49        while let Some(chunk) = stream.message().await.context("result stream error")? {
50            result.extend_from_slice(&chunk.ciphertext);
51            if chunk.is_final { break; }
52        }
53        Ok(result)
54    }
55}