1use anyhow::Context;
2use bytes::Bytes;
3use dashmap::DashMap;
4use futures::Stream;
5use subtle::ConstantTimeEq;
6use mesh_proto::data_plane_server::DataPlane;
7use mesh_proto::data_plane_server::DataPlaneServer as TonicDataPlaneServer;
8use mesh_proto::{EncryptedChunk, HandshakeRequest, HandshakeResponse, TransferAck, ResultRequest};
9use std::pin::Pin;
10use std::sync::Arc;
11use tonic::{Request, Response, Status, Streaming};
12
13#[derive(Clone, Debug)]
14#[allow(dead_code)] struct SessionEntry {
16 session_token: String,
17 consumer_did: String,
18 payload: Option<Bytes>,
19}
20
21#[derive(Clone)]
22pub struct DataPlaneServerImpl {
23 sessions: Arc<DashMap<String, SessionEntry>>,
24 session_secret: Vec<u8>,
25 provider_did: String,
26}
27
28impl DataPlaneServerImpl {
29 pub fn new(session_secret: Vec<u8>, provider_did: impl Into<String>) -> Self {
30 Self {
31 sessions: Arc::new(DashMap::new()),
32 session_secret,
33 provider_did: provider_did.into(),
34 }
35 }
36}
37
38#[tonic::async_trait]
39impl DataPlane for DataPlaneServerImpl {
40 async fn handshake(&self, req: Request<HandshakeRequest>) -> Result<Response<HandshakeResponse>, Status> {
41 let r = req.into_inner();
42 let session_id = r.session_id.clone();
43 let token = r.session_token.clone();
44 let consumer = r.consumer_did.clone();
45
46 let expected = mesh_session::issue_simple_token(&self.session_secret, &session_id, &consumer, &self.provider_did);
48 let ok: bool = expected.as_bytes().ct_eq(token.as_bytes()).into();
49 if !ok {
50 eprintln!("token mismatch: expected={} provided={}", expected, token);
51 }
52 if !ok {
53 return Err(Status::unauthenticated("invalid session token"));
54 }
55
56 self.sessions.insert(session_id.clone(), SessionEntry { session_token: token.clone(), consumer_did: consumer.clone(), payload: None });
58
59 let resp = HandshakeResponse {
60 provider_ephemeral_pub: Vec::new(),
61 provider_did: self.provider_did.clone(),
62 provider_did_signature: Vec::new(),
63 attestation: None,
64 };
65 Ok(Response::new(resp))
66 }
67
68 async fn transfer(&self, req: Request<Streaming<EncryptedChunk>>) -> Result<Response<TransferAck>, Status> {
69 let mut stream = req.into_inner();
70 let mut session_id_opt: Option<String> = None;
74 let mut buffer = Vec::new();
75 let mut chunks = 0u32;
76
77 while let Some(item) = stream.message().await.map_err(|e| Status::internal(format!("stream error: {}", e)))? {
78 chunks += 1;
79 if session_id_opt.is_none() && item.sequence == 0 {
80 if let Ok(s) = String::from_utf8(item.ciphertext.clone()) {
82 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
83 if let Some(sid) = v.get("session_id").and_then(|x| x.as_str()) {
84 session_id_opt = Some(sid.to_string());
85 continue;
86 }
87 }
88 }
89 }
90 buffer.extend_from_slice(&item.ciphertext);
92 }
93
94 let session_id = session_id_opt.ok_or_else(|| Status::invalid_argument("missing session_id in first chunk"))?;
95 let payload = Bytes::from(buffer);
96
97 if let Some(mut entry) = self.sessions.get_mut(&session_id) {
98 entry.payload = Some(payload.clone());
99 } else {
100 self.sessions.insert(session_id.clone(), SessionEntry { session_token: String::new(), consumer_did: String::new(), payload: Some(payload.clone()) });
102 }
103
104 let ack = TransferAck { accepted: true, chunks_received: chunks, error_code: None, error_message: None };
105 Ok(Response::new(ack))
106 }
107
108 type ResultStream = Pin<Box<dyn Stream<Item = Result<EncryptedChunk, Status>> + Send + 'static>>;
109
110 async fn result(&self, req: Request<ResultRequest>) -> Result<Response<Self::ResultStream>, Status> {
111 let session_id = req.into_inner().session_id;
112 let sessions = self.sessions.clone();
113
114 let out = async_stream::try_stream! {
115 if let Some(entry) = sessions.get(&session_id) {
116 if let Some(payload) = entry.payload.clone() {
117 let chunk = EncryptedChunk {
118 ciphertext: payload.to_vec(),
119 nonce: Vec::new(),
120 sequence: 1,
121 is_final: true,
122 algorithm: "none".to_string(),
123 };
124 yield chunk;
125 } else {
126 Err(Status::not_found("no payload for session"))?;
127 }
128 } else {
129 Err(Status::not_found("session not found"))?;
130 }
131 };
132
133 Ok(Response::new(Box::pin(out) as Self::ResultStream))
134 }
135
136 type StreamingTaskStream = Pin<Box<dyn Stream<Item = Result<EncryptedChunk, Status>> + Send + 'static>>;
138 async fn streaming_task(&self, _req: Request<Streaming<EncryptedChunk>>) -> Result<Response<Self::StreamingTaskStream>, Status> {
139 Err(Status::unimplemented("StreamingTask not implemented"))
140 }
141}
142
143pub async fn serve(bind_addr: &str, session_secret: Vec<u8>, provider_did: impl Into<String>) -> Result<(), anyhow::Error> {
144 let svc = DataPlaneServerImpl::new(session_secret, provider_did);
145 let addr = bind_addr.parse().context("parse bind_addr")?;
146 tonic::transport::Server::builder()
147 .add_service(TonicDataPlaneServer::new(svc))
148 .serve(addr)
149 .await
150 .context("server failed")?;
151 Ok(())
152}