1use std::path::Path;
2use std::process::{Command, Stdio};
3use std::sync::Arc;
4
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::transport::{Channel, Endpoint};
8
9use crate::storage::{StorageError, StorageProvider, StorageResult};
10
11#[derive(Clone)]
12pub struct RelayStorageProvider {
13 endpoint: String,
14 pub token: Arc<tokio::sync::RwLock<Option<String>>>,
15}
16
17impl RelayStorageProvider {
18 pub fn new(endpoint: impl Into<String>) -> Self {
19 Self {
20 endpoint: endpoint.into(),
21 token: Arc::new(tokio::sync::RwLock::new(None)),
22 }
23 }
24
25 async fn connect(
26 &self,
27 ) -> StorageResult<vyn_relay::proto::vyn_relay_client::VynRelayClient<Channel>> {
28 let endpoint = Endpoint::from_shared(self.endpoint.clone())
29 .map_err(|err| StorageError::Transport(err.to_string()))?;
30 vyn_relay::proto::vyn_relay_client::VynRelayClient::connect(endpoint)
31 .await
32 .map_err(|err| StorageError::Transport(err.to_string()))
33 }
34
35 pub async fn authenticate(
37 &self,
38 user_id: &str,
39 sign_fn: impl Fn(&[u8]) -> StorageResult<String>,
40 ) -> StorageResult<()> {
41 let mut client = self.connect().await?;
42
43 let challenge_resp = client
45 .authenticate(Request::new(vyn_relay::proto::AuthRequest {
46 user_id: user_id.to_string(),
47 nonce: Vec::new(),
48 signature: String::new(),
49 }))
50 .await
51 .map_err(|e| StorageError::Transport(e.to_string()))?
52 .into_inner();
53
54 let nonce = challenge_resp.challenge_nonce;
55
56 let signature = sign_fn(&nonce)?;
58 let auth_resp = client
59 .authenticate(Request::new(vyn_relay::proto::AuthRequest {
60 user_id: user_id.to_string(),
61 nonce: nonce.clone(),
62 signature,
63 }))
64 .await
65 .map_err(|e| StorageError::Transport(e.to_string()))?
66 .into_inner();
67
68 if !auth_resp.authenticated {
69 return Err(StorageError::Transport(
70 "relay authentication failed".to_string(),
71 ));
72 }
73
74 *self.token.write().await = Some(auth_resp.token);
75 Ok(())
76 }
77
78 pub async fn authenticate_with_identity(&self, vault_dir: &Path) -> StorageResult<()> {
81 let token_path = vault_dir.join("session.token");
83 if token_path.exists() {
84 let cached = std::fs::read_to_string(&token_path).unwrap_or_default();
85 let cached = cached.trim().to_string();
86 if !cached.is_empty() {
87 *self.token.write().await = Some(cached);
88 return Ok(());
89 }
90 }
91
92 let identity = load_identity(vault_dir)?;
93 let private_key_path = identity.ssh_private_key.clone();
94 let public_key_path = identity.ssh_public_key.clone();
95 let user_id = identity.github_username.clone();
96
97 let public_key = std::fs::read_to_string(&public_key_path)
98 .map_err(|e| StorageError::Transport(format!("failed to read public key: {e}")))?;
99 let public_key = public_key.trim().to_string();
100
101 self.ensure_identity_registered(&user_id, &public_key, &private_key_path)
103 .await?;
104
105 let private_key_path2 = private_key_path.clone();
106 self.authenticate(&user_id, move |nonce| {
107 sign_nonce_with_ssh_key(nonce, Path::new(&private_key_path2))
108 })
109 .await?;
110
111 if let Some(ref tok) = *self.token.read().await {
113 let _ = write_token_file(&token_path, tok);
114 }
115 Ok(())
116 }
117
118 pub fn clear_cached_token(vault_dir: &Path) {
119 let token_path = vault_dir.join("session.token");
120 let _ = std::fs::remove_file(token_path);
121 }
122
123 async fn ensure_identity_registered(
124 &self,
125 user_id: &str,
126 public_key: &str,
127 private_key_path: &str,
128 ) -> StorageResult<()> {
129 let registration_payload = format!("vyn-register:{user_id}:{public_key}");
130 let signature =
131 sign_nonce_with_ssh_key(registration_payload.as_bytes(), Path::new(private_key_path))?;
132
133 let mut client = self.connect().await?;
134 client
135 .register_identity(Request::new(vyn_relay::proto::RegisterRequest {
136 user_id: user_id.to_string(),
137 public_key: public_key.to_string(),
138 signature,
139 }))
140 .await
141 .map_err(|e| StorageError::Transport(e.to_string()))?;
142
143 Ok(())
144 }
145
146 async fn inject_token<T>(&self, mut request: Request<T>) -> StorageResult<Request<T>> {
147 if let Some(ref tok) = *self.token.read().await {
148 let val = MetadataValue::try_from(tok.as_str())
149 .map_err(|e| StorageError::Transport(e.to_string()))?;
150 request.metadata_mut().insert("x-vyn-token", val);
151 }
152 Ok(request)
153 }
154}
155
156impl StorageProvider for RelayStorageProvider {
157 async fn get_manifest(&self, project_id: &str) -> StorageResult<Option<Vec<u8>>> {
158 let mut client = self.connect().await?;
159 let response = client
160 .get_manifest(Request::new(vyn_relay::proto::GetManifestRequest {
161 project_id: project_id.to_string(),
162 }))
163 .await
164 .map_err(|err| StorageError::Transport(err.to_string()))?
165 .into_inner();
166
167 if response.found {
168 Ok(Some(response.payload))
169 } else {
170 Ok(None)
171 }
172 }
173
174 async fn put_manifest(&self, project_id: &str, manifest_payload: &[u8]) -> StorageResult<()> {
175 let mut client = self.connect().await?;
176 let req = self
177 .inject_token(Request::new(vyn_relay::proto::PutManifestRequest {
178 project_id: project_id.to_string(),
179 payload: manifest_payload.to_vec(),
180 }))
181 .await?;
182 client
183 .put_manifest(req)
184 .await
185 .map_err(|err| StorageError::Transport(err.to_string()))?;
186 Ok(())
187 }
188
189 async fn upload_blob(&self, hash: &str, data: Vec<u8>) -> StorageResult<()> {
190 let mut client = self.connect().await?;
191 let message = vyn_relay::proto::UploadBlobChunk {
192 hash: hash.to_string(),
193 chunk: data,
194 };
195
196 let mut req = Request::new(tokio_stream::iter(vec![message]));
197 if let Some(ref tok) = *self.token.read().await {
198 let val = MetadataValue::try_from(tok.as_str())
199 .map_err(|e| StorageError::Transport(e.to_string()))?;
200 req.metadata_mut().insert("x-vyn-token", val);
201 }
202 client
203 .upload_blob(req)
204 .await
205 .map_err(|err| StorageError::Transport(err.to_string()))?;
206 Ok(())
207 }
208
209 async fn download_blob(&self, hash: &str) -> StorageResult<Option<Vec<u8>>> {
210 let mut client = self.connect().await?;
211 let stream = client
212 .download_blob(Request::new(vyn_relay::proto::DownloadBlobRequest {
213 hash: hash.to_string(),
214 }))
215 .await
216 .map_err(|err| {
217 if err.code() == tonic::Code::NotFound {
218 StorageError::NotFound
219 } else {
220 StorageError::Transport(err.to_string())
221 }
222 });
223
224 if let Err(StorageError::NotFound) = stream {
225 return Ok(None);
226 }
227
228 let mut stream = stream?.into_inner();
229
230 let mut out = Vec::new();
231 while let Some(chunk) = stream
232 .message()
233 .await
234 .map_err(|err| StorageError::Transport(err.to_string()))?
235 {
236 out.extend_from_slice(&chunk.chunk);
237 }
238
239 Ok(Some(out))
240 }
241
242 async fn create_invite(
243 &self,
244 user_id: &str,
245 vault_id: &str,
246 payload: Vec<u8>,
247 ) -> StorageResult<()> {
248 let mut client = self.connect().await?;
249 let req = self
250 .inject_token(Request::new(vyn_relay::proto::CreateInviteRequest {
251 user_id: user_id.to_string(),
252 vault_id: vault_id.to_string(),
253 payload,
254 }))
255 .await?;
256 client
257 .create_invite(req)
258 .await
259 .map_err(|err| StorageError::Transport(err.to_string()))?;
260 Ok(())
261 }
262
263 async fn get_invites(&self, user_id: &str, vault_id: &str) -> StorageResult<Vec<Vec<u8>>> {
264 let mut client = self.connect().await?;
265 let response = client
266 .get_invites(Request::new(vyn_relay::proto::GetInvitesRequest {
267 user_id: user_id.to_string(),
268 vault_id: vault_id.to_string(),
269 }))
270 .await
271 .map_err(|err| StorageError::Transport(err.to_string()))?
272 .into_inner();
273 Ok(response.payloads)
274 }
275}
276
277impl RelayStorageProvider {
278 pub async fn list_vaults(&self) -> StorageResult<Vec<String>> {
280 let mut client = self.connect().await?;
281 let mut request = tonic::Request::new(vyn_relay::proto::ListVaultsRequest {});
282 request = self.inject_token(request).await?;
283 let response = client
284 .list_vaults(request)
285 .await
286 .map_err(|err| StorageError::Transport(err.to_string()))?
287 .into_inner();
288 Ok(response.vault_ids)
289 }
290
291 pub async fn list_blobs(&self) -> StorageResult<Vec<(String, u64)>> {
293 let mut client = self.connect().await?;
294 let mut request = tonic::Request::new(vyn_relay::proto::ListBlobsRequest {
295 vault_id: String::new(),
296 });
297 request = self.inject_token(request).await?;
298 let response = client
299 .list_blobs(request)
300 .await
301 .map_err(|err| StorageError::Transport(err.to_string()))?
302 .into_inner();
303 Ok(response
304 .blobs
305 .into_iter()
306 .map(|b| (b.sha256, b.size_bytes))
307 .collect())
308 }
309}
310
311struct IdentityConfig {
312 github_username: String,
313 ssh_private_key: String,
314 ssh_public_key: String,
315}
316
317fn load_identity(vault_dir: &Path) -> StorageResult<IdentityConfig> {
318 let path = vault_dir.join("identity.toml");
319 let text = std::fs::read_to_string(&path)
320 .map_err(|e| StorageError::Transport(format!("failed to read identity.toml: {e}")))?;
321 let github_username = parse_toml_string(&text, "github_username").ok_or_else(|| {
322 StorageError::Transport("missing github_username in identity.toml".into())
323 })?;
324 let ssh_private_key = parse_toml_string(&text, "ssh_private_key").ok_or_else(|| {
325 StorageError::Transport("missing ssh_private_key in identity.toml".into())
326 })?;
327 let ssh_public_key = parse_toml_string(&text, "ssh_public_key")
328 .ok_or_else(|| StorageError::Transport("missing ssh_public_key in identity.toml".into()))?;
329 Ok(IdentityConfig {
330 github_username,
331 ssh_private_key,
332 ssh_public_key,
333 })
334}
335
336fn parse_toml_string(text: &str, key: &str) -> Option<String> {
337 for line in text.lines() {
338 let line = line.trim();
339 if let Some(rest) = line.strip_prefix(key) {
340 let rest = rest.trim().strip_prefix('=')?;
341 let val = rest.trim().trim_matches('"');
342 return Some(val.to_string());
343 }
344 }
345 None
346}
347
348fn write_token_file(path: &Path, token: &str) -> std::io::Result<()> {
349 std::fs::write(path, token)?;
350 #[cfg(unix)]
351 {
352 use std::os::unix::fs::PermissionsExt;
353 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
354 }
355 Ok(())
356}
357
358fn sign_nonce_with_ssh_key(nonce: &[u8], private_key: &Path) -> StorageResult<String> {
359 let tmp = tempfile::TempDir::new()
360 .map_err(|e| StorageError::Transport(format!("failed to create temp dir: {e}")))?;
361 let nonce_file = tmp.path().join("nonce");
362 std::fs::write(&nonce_file, nonce)
363 .map_err(|e| StorageError::Transport(format!("failed to write nonce: {e}")))?;
364
365 let status = Command::new("ssh-keygen")
366 .args([
367 "-Y",
368 "sign",
369 "-f",
370 private_key.to_str().unwrap_or(""),
371 "-n",
372 "vyn",
373 nonce_file.to_str().unwrap_or(""),
374 ])
375 .stdout(Stdio::null())
376 .stderr(Stdio::null())
377 .status()
378 .map_err(|e| StorageError::Transport(format!("failed to run ssh-keygen: {e}")))?;
379
380 if !status.success() {
381 return Err(StorageError::Transport("ssh-keygen signing failed".into()));
382 }
383
384 let sig_file = tmp.path().join("nonce.sig");
385 let sig = std::fs::read_to_string(&sig_file)
386 .map_err(|e| StorageError::Transport(format!("failed to read signature: {e}")))?;
387 Ok(sig)
388}
389
390#[cfg(test)]
391mod tests {
392 use crate::crypto::secret_bytes;
393 use crate::manifest::Manifest;
394 use crate::storage::{StorageProvider, decrypt_manifest, encrypt_manifest};
395
396 use super::RelayStorageProvider;
397
398 #[tokio::test]
399 async fn relay_roundtrip() {
400 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
401 .await
402 .expect("bind test port");
403 let port = listener.local_addr().expect("read local addr").port();
404
405 let data_dir = std::env::temp_dir().join(format!("vyn-relay-it-{}", uuid::Uuid::new_v4()));
406 let data_dir_string = data_dir.to_string_lossy().to_string();
407
408 let handle = tokio::spawn(async move {
409 let _ = vyn_relay::serve_with_listener(listener, data_dir_string).await;
410 });
411
412 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
413
414 let provider = RelayStorageProvider::new(format!("http://127.0.0.1:{port}"));
415 *provider.token.write().await = Some("test-bypass".to_string());
417
418 let mut uploaded = false;
419 for _ in 0..20 {
420 if provider
421 .upload_blob("blob123", b"hello-relay".to_vec())
422 .await
423 .is_ok()
424 {
425 uploaded = true;
426 break;
427 }
428 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
429 }
430 assert!(uploaded, "blob upload should succeed once relay is ready");
431 let blob = provider
432 .download_blob("blob123")
433 .await
434 .expect("blob download should succeed")
435 .expect("blob should exist");
436 assert_eq!(blob, b"hello-relay");
437
438 let key = secret_bytes(vec![3u8; 32]);
439 let manifest = Manifest {
440 version: 1,
441 files: vec![],
442 };
443 let payload = encrypt_manifest(&manifest, &key).expect("encrypt manifest");
444 provider
445 .put_manifest("vault-it", &payload)
446 .await
447 .expect("put manifest should succeed");
448 let pulled = provider
449 .get_manifest("vault-it")
450 .await
451 .expect("get manifest should succeed")
452 .expect("manifest should exist");
453 let restored = decrypt_manifest(&pulled, &key).expect("decrypt manifest");
454 assert_eq!(restored.version, 1);
455
456 provider
457 .create_invite("alice", "vault-it", b"invite-data".to_vec())
458 .await
459 .expect("create invite should succeed");
460 let invites = provider
461 .get_invites("alice", "vault-it")
462 .await
463 .expect("get invites should succeed");
464 assert_eq!(invites.len(), 1);
465 assert_eq!(invites[0], b"invite-data");
466
467 handle.abort();
468 let _ = std::fs::remove_dir_all(data_dir);
469 }
470}