Skip to main content

mesh_dataplane/
server.rs

1use anyhow::Context;
2use bytes::Bytes;
3use dashmap::DashMap;
4use futures::Stream;
5use subtle::ConstantTimeEq;
6use mesh_proto::data_plane_server::DataPlane;
7use mesh_proto::data_plane_server::DataPlaneServer as TonicDataPlaneServer;
8use mesh_proto::{EncryptedChunk, HandshakeRequest, HandshakeResponse, TransferAck, ResultRequest};
9use std::pin::Pin;
10use std::sync::Arc;
11use tonic::{Request, Response, Status, Streaming};
12
13#[derive(Clone, Debug)]
14#[allow(dead_code)] // fields reserved for future session lifecycle / audit
15struct SessionEntry {
16    session_token: String,
17    consumer_did: String,
18    payload: Option<Bytes>,
19}
20
21#[derive(Clone)]
22pub struct DataPlaneServerImpl {
23    sessions: Arc<DashMap<String, SessionEntry>>,
24    session_secret: Vec<u8>,
25    provider_did: String,
26}
27
28impl DataPlaneServerImpl {
29    pub fn new(session_secret: Vec<u8>, provider_did: impl Into<String>) -> Self {
30        Self {
31            sessions: Arc::new(DashMap::new()),
32            session_secret,
33            provider_did: provider_did.into(),
34        }
35    }
36}
37
38#[tonic::async_trait]
39impl DataPlane for DataPlaneServerImpl {
40    async fn handshake(&self, req: Request<HandshakeRequest>) -> Result<Response<HandshakeResponse>, Status> {
41        let r = req.into_inner();
42        let session_id = r.session_id.clone();
43        let token = r.session_token.clone();
44        let consumer = r.consumer_did.clone();
45
46        // Validate token using mesh-session HMAC validator
47        let expected = mesh_session::issue_simple_token(&self.session_secret, &session_id, &consumer, &self.provider_did);
48        let ok: bool = expected.as_bytes().ct_eq(token.as_bytes()).into();
49        if !ok {
50            eprintln!("token mismatch: expected={} provided={}", expected, token);
51        }
52        if !ok {
53            return Err(Status::unauthenticated("invalid session token"));
54        }
55
56        // store session
57        self.sessions.insert(session_id.clone(), SessionEntry { session_token: token.clone(), consumer_did: consumer.clone(), payload: None });
58
59        let resp = HandshakeResponse {
60            provider_ephemeral_pub: Vec::new(),
61            provider_did: self.provider_did.clone(),
62            provider_did_signature: Vec::new(),
63            attestation: None,
64        };
65        Ok(Response::new(resp))
66    }
67
68    async fn transfer(&self, req: Request<Streaming<EncryptedChunk>>) -> Result<Response<TransferAck>, Status> {
69        let mut stream = req.into_inner();
70        // For this simple implementation, the client should have sent a first chunk containing session_id in metadata; we accept chunks and reconstruct a payload keyed by session id inside ciphertext's first bytes encoded as UTF-8 prefix JSON.
71        // Simpler: expect first chunk sequence==0 with plaintext JSON {"session_id":"..."} as ciphertext.
72
73        let mut session_id_opt: Option<String> = None;
74        let mut buffer = Vec::new();
75        let mut chunks = 0u32;
76
77        while let Some(item) = stream.message().await.map_err(|e| Status::internal(format!("stream error: {}", e)))? {
78            chunks += 1;
79            if session_id_opt.is_none() && item.sequence == 0 {
80                // try to parse the ciphertext as UTF-8 JSON
81                if let Ok(s) = String::from_utf8(item.ciphertext.clone()) {
82                    if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
83                        if let Some(sid) = v.get("session_id").and_then(|x| x.as_str()) {
84                            session_id_opt = Some(sid.to_string());
85                            continue;
86                        }
87                    }
88                }
89            }
90            // append ciphertext as payload
91            buffer.extend_from_slice(&item.ciphertext);
92        }
93
94        let session_id = session_id_opt.ok_or_else(|| Status::invalid_argument("missing session_id in first chunk"))?;
95        let payload = Bytes::from(buffer);
96
97        if let Some(mut entry) = self.sessions.get_mut(&session_id) {
98            entry.payload = Some(payload.clone());
99        } else {
100            // create session entry if missing
101            self.sessions.insert(session_id.clone(), SessionEntry { session_token: String::new(), consumer_did: String::new(), payload: Some(payload.clone()) });
102        }
103
104        let ack = TransferAck { accepted: true, chunks_received: chunks, error_code: None, error_message: None };
105        Ok(Response::new(ack))
106    }
107
108    type ResultStream = Pin<Box<dyn Stream<Item = Result<EncryptedChunk, Status>> + Send + 'static>>;
109
110    async fn result(&self, req: Request<ResultRequest>) -> Result<Response<Self::ResultStream>, Status> {
111        let session_id = req.into_inner().session_id;
112        let sessions = self.sessions.clone();
113
114        let out = async_stream::try_stream! {
115            if let Some(entry) = sessions.get(&session_id) {
116                if let Some(payload) = entry.payload.clone() {
117                    let chunk = EncryptedChunk {
118                        ciphertext: payload.to_vec(),
119                        nonce: Vec::new(),
120                        sequence: 1,
121                        is_final: true,
122                        algorithm: "none".to_string(),
123                    };
124                    yield chunk;
125                } else {
126                    Err(Status::not_found("no payload for session"))?;
127                }
128            } else {
129                Err(Status::not_found("session not found"))?;
130            }
131        };
132
133        Ok(Response::new(Box::pin(out) as Self::ResultStream))
134    }
135
136    // StreamingTask not implemented in this MVP
137    type StreamingTaskStream = Pin<Box<dyn Stream<Item = Result<EncryptedChunk, Status>> + Send + 'static>>;
138    async fn streaming_task(&self, _req: Request<Streaming<EncryptedChunk>>) -> Result<Response<Self::StreamingTaskStream>, Status> {
139        Err(Status::unimplemented("StreamingTask not implemented"))
140    }
141}
142
143pub async fn serve(bind_addr: &str, session_secret: Vec<u8>, provider_did: impl Into<String>) -> Result<(), anyhow::Error> {
144    let svc = DataPlaneServerImpl::new(session_secret, provider_did);
145    let addr = bind_addr.parse().context("parse bind_addr")?;
146    tonic::transport::Server::builder()
147        .add_service(TonicDataPlaneServer::new(svc))
148        .serve(addr)
149        .await
150        .context("server failed")?;
151    Ok(())
152}