Skip to main content

vyn_core/
relay_storage.rs

1use std::path::Path;
2use std::process::{Command, Stdio};
3use std::sync::Arc;
4
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::transport::{Channel, Endpoint};
8
9use crate::storage::{StorageError, StorageProvider, StorageResult};
10
11#[derive(Clone)]
12pub struct RelayStorageProvider {
13    endpoint: String,
14    pub token: Arc<tokio::sync::RwLock<Option<String>>>,
15}
16
17impl RelayStorageProvider {
18    pub fn new(endpoint: impl Into<String>) -> Self {
19        Self {
20            endpoint: endpoint.into(),
21            token: Arc::new(tokio::sync::RwLock::new(None)),
22        }
23    }
24
25    async fn connect(
26        &self,
27    ) -> StorageResult<vyn_relay::proto::vyn_relay_client::VynRelayClient<Channel>> {
28        let endpoint = Endpoint::from_shared(self.endpoint.clone())
29            .map_err(|err| StorageError::Transport(err.to_string()))?;
30        vyn_relay::proto::vyn_relay_client::VynRelayClient::connect(endpoint)
31            .await
32            .map_err(|err| StorageError::Transport(err.to_string()))
33    }
34
35    /// Perform the two-step challenge-response auth and store the session token.
36    pub async fn authenticate(
37        &self,
38        user_id: &str,
39        sign_fn: impl Fn(&[u8]) -> StorageResult<String>,
40    ) -> StorageResult<()> {
41        let mut client = self.connect().await?;
42
43        // Step 1: request challenge
44        let challenge_resp = client
45            .authenticate(Request::new(vyn_relay::proto::AuthRequest {
46                user_id: user_id.to_string(),
47                nonce: Vec::new(),
48                signature: String::new(),
49            }))
50            .await
51            .map_err(|e| StorageError::Transport(e.to_string()))?
52            .into_inner();
53
54        let nonce = challenge_resp.challenge_nonce;
55
56        // Step 2: sign nonce and send back
57        let signature = sign_fn(&nonce)?;
58        let auth_resp = client
59            .authenticate(Request::new(vyn_relay::proto::AuthRequest {
60                user_id: user_id.to_string(),
61                nonce: nonce.clone(),
62                signature,
63            }))
64            .await
65            .map_err(|e| StorageError::Transport(e.to_string()))?
66            .into_inner();
67
68        if !auth_resp.authenticated {
69            return Err(StorageError::Transport(
70                "relay authentication failed".to_string(),
71            ));
72        }
73
74        *self.token.write().await = Some(auth_resp.token);
75        Ok(())
76    }
77
78    /// Register identity on the relay (idempotent) then authenticate.
79    /// `vault_dir` is the `.vyn/` directory (contains `identity.toml`).
80    pub async fn authenticate_with_identity(&self, vault_dir: &Path) -> StorageResult<()> {
81        // Check for a cached session token first.
82        let token_path = vault_dir.join("session.token");
83        if token_path.exists() {
84            let cached = std::fs::read_to_string(&token_path).unwrap_or_default();
85            let cached = cached.trim().to_string();
86            if !cached.is_empty() {
87                *self.token.write().await = Some(cached);
88                return Ok(());
89            }
90        }
91
92        let identity = load_identity(vault_dir)?;
93        let private_key_path = identity.ssh_private_key.clone();
94        let public_key_path = identity.ssh_public_key.clone();
95        let user_id = identity.github_username.clone();
96
97        let public_key = std::fs::read_to_string(&public_key_path)
98            .map_err(|e| StorageError::Transport(format!("failed to read public key: {e}")))?;
99        let public_key = public_key.trim().to_string();
100
101        // Register identity on the relay (no-op if already registered with same key)
102        self.ensure_identity_registered(&user_id, &public_key, &private_key_path)
103            .await?;
104
105        let private_key_path2 = private_key_path.clone();
106        self.authenticate(&user_id, move |nonce| {
107            sign_nonce_with_ssh_key(nonce, Path::new(&private_key_path2))
108        })
109        .await?;
110
111        // Cache the session token for future runs.
112        if let Some(ref tok) = *self.token.read().await {
113            let _ = write_token_file(&token_path, tok);
114        }
115        Ok(())
116    }
117
118    pub fn clear_cached_token(vault_dir: &Path) {
119        let token_path = vault_dir.join("session.token");
120        let _ = std::fs::remove_file(token_path);
121    }
122
123    async fn ensure_identity_registered(
124        &self,
125        user_id: &str,
126        public_key: &str,
127        private_key_path: &str,
128    ) -> StorageResult<()> {
129        let registration_payload = format!("vyn-register:{user_id}:{public_key}");
130        let signature =
131            sign_nonce_with_ssh_key(registration_payload.as_bytes(), Path::new(private_key_path))?;
132
133        let mut client = self.connect().await?;
134        client
135            .register_identity(Request::new(vyn_relay::proto::RegisterRequest {
136                user_id: user_id.to_string(),
137                public_key: public_key.to_string(),
138                signature,
139            }))
140            .await
141            .map_err(|e| StorageError::Transport(e.to_string()))?;
142
143        Ok(())
144    }
145
146    async fn inject_token<T>(&self, mut request: Request<T>) -> StorageResult<Request<T>> {
147        if let Some(ref tok) = *self.token.read().await {
148            let val = MetadataValue::try_from(tok.as_str())
149                .map_err(|e| StorageError::Transport(e.to_string()))?;
150            request.metadata_mut().insert("x-vyn-token", val);
151        }
152        Ok(request)
153    }
154}
155
156impl StorageProvider for RelayStorageProvider {
157    async fn get_manifest(&self, project_id: &str) -> StorageResult<Option<Vec<u8>>> {
158        let mut client = self.connect().await?;
159        let response = client
160            .get_manifest(Request::new(vyn_relay::proto::GetManifestRequest {
161                project_id: project_id.to_string(),
162            }))
163            .await
164            .map_err(|err| StorageError::Transport(err.to_string()))?
165            .into_inner();
166
167        if response.found {
168            Ok(Some(response.payload))
169        } else {
170            Ok(None)
171        }
172    }
173
174    async fn put_manifest(&self, project_id: &str, manifest_payload: &[u8]) -> StorageResult<()> {
175        let mut client = self.connect().await?;
176        let req = self
177            .inject_token(Request::new(vyn_relay::proto::PutManifestRequest {
178                project_id: project_id.to_string(),
179                payload: manifest_payload.to_vec(),
180            }))
181            .await?;
182        client
183            .put_manifest(req)
184            .await
185            .map_err(|err| StorageError::Transport(err.to_string()))?;
186        Ok(())
187    }
188
189    async fn upload_blob(&self, hash: &str, data: Vec<u8>) -> StorageResult<()> {
190        let mut client = self.connect().await?;
191        let message = vyn_relay::proto::UploadBlobChunk {
192            hash: hash.to_string(),
193            chunk: data,
194        };
195
196        let mut req = Request::new(tokio_stream::iter(vec![message]));
197        if let Some(ref tok) = *self.token.read().await {
198            let val = MetadataValue::try_from(tok.as_str())
199                .map_err(|e| StorageError::Transport(e.to_string()))?;
200            req.metadata_mut().insert("x-vyn-token", val);
201        }
202        client
203            .upload_blob(req)
204            .await
205            .map_err(|err| StorageError::Transport(err.to_string()))?;
206        Ok(())
207    }
208
209    async fn download_blob(&self, hash: &str) -> StorageResult<Option<Vec<u8>>> {
210        let mut client = self.connect().await?;
211        let stream = client
212            .download_blob(Request::new(vyn_relay::proto::DownloadBlobRequest {
213                hash: hash.to_string(),
214            }))
215            .await
216            .map_err(|err| {
217                if err.code() == tonic::Code::NotFound {
218                    StorageError::NotFound
219                } else {
220                    StorageError::Transport(err.to_string())
221                }
222            });
223
224        if let Err(StorageError::NotFound) = stream {
225            return Ok(None);
226        }
227
228        let mut stream = stream?.into_inner();
229
230        let mut out = Vec::new();
231        while let Some(chunk) = stream
232            .message()
233            .await
234            .map_err(|err| StorageError::Transport(err.to_string()))?
235        {
236            out.extend_from_slice(&chunk.chunk);
237        }
238
239        Ok(Some(out))
240    }
241
242    async fn create_invite(
243        &self,
244        user_id: &str,
245        vault_id: &str,
246        payload: Vec<u8>,
247    ) -> StorageResult<()> {
248        let mut client = self.connect().await?;
249        let req = self
250            .inject_token(Request::new(vyn_relay::proto::CreateInviteRequest {
251                user_id: user_id.to_string(),
252                vault_id: vault_id.to_string(),
253                payload,
254            }))
255            .await?;
256        client
257            .create_invite(req)
258            .await
259            .map_err(|err| StorageError::Transport(err.to_string()))?;
260        Ok(())
261    }
262
263    async fn get_invites(&self, user_id: &str, vault_id: &str) -> StorageResult<Vec<Vec<u8>>> {
264        let mut client = self.connect().await?;
265        let response = client
266            .get_invites(Request::new(vyn_relay::proto::GetInvitesRequest {
267                user_id: user_id.to_string(),
268                vault_id: vault_id.to_string(),
269            }))
270            .await
271            .map_err(|err| StorageError::Transport(err.to_string()))?
272            .into_inner();
273        Ok(response.payloads)
274    }
275}
276
277impl RelayStorageProvider {
278    /// Lists all vault IDs on the relay (requires auth token).
279    pub async fn list_vaults(&self) -> StorageResult<Vec<String>> {
280        let mut client = self.connect().await?;
281        let mut request = tonic::Request::new(vyn_relay::proto::ListVaultsRequest {});
282        request = self.inject_token(request).await?;
283        let response = client
284            .list_vaults(request)
285            .await
286            .map_err(|err| StorageError::Transport(err.to_string()))?
287            .into_inner();
288        Ok(response.vault_ids)
289    }
290
291    /// Lists all blobs on the relay (requires auth token).
292    pub async fn list_blobs(&self) -> StorageResult<Vec<(String, u64)>> {
293        let mut client = self.connect().await?;
294        let mut request = tonic::Request::new(vyn_relay::proto::ListBlobsRequest {
295            vault_id: String::new(),
296        });
297        request = self.inject_token(request).await?;
298        let response = client
299            .list_blobs(request)
300            .await
301            .map_err(|err| StorageError::Transport(err.to_string()))?
302            .into_inner();
303        Ok(response
304            .blobs
305            .into_iter()
306            .map(|b| (b.sha256, b.size_bytes))
307            .collect())
308    }
309}
310
311struct IdentityConfig {
312    github_username: String,
313    ssh_private_key: String,
314    ssh_public_key: String,
315}
316
317fn load_identity(vault_dir: &Path) -> StorageResult<IdentityConfig> {
318    let path = vault_dir.join("identity.toml");
319    let text = std::fs::read_to_string(&path)
320        .map_err(|e| StorageError::Transport(format!("failed to read identity.toml: {e}")))?;
321    let github_username = parse_toml_string(&text, "github_username").ok_or_else(|| {
322        StorageError::Transport("missing github_username in identity.toml".into())
323    })?;
324    let ssh_private_key = parse_toml_string(&text, "ssh_private_key").ok_or_else(|| {
325        StorageError::Transport("missing ssh_private_key in identity.toml".into())
326    })?;
327    let ssh_public_key = parse_toml_string(&text, "ssh_public_key")
328        .ok_or_else(|| StorageError::Transport("missing ssh_public_key in identity.toml".into()))?;
329    Ok(IdentityConfig {
330        github_username,
331        ssh_private_key,
332        ssh_public_key,
333    })
334}
335
336fn parse_toml_string(text: &str, key: &str) -> Option<String> {
337    for line in text.lines() {
338        let line = line.trim();
339        if let Some(rest) = line.strip_prefix(key) {
340            let rest = rest.trim().strip_prefix('=')?;
341            let val = rest.trim().trim_matches('"');
342            return Some(val.to_string());
343        }
344    }
345    None
346}
347
348fn write_token_file(path: &Path, token: &str) -> std::io::Result<()> {
349    std::fs::write(path, token)?;
350    #[cfg(unix)]
351    {
352        use std::os::unix::fs::PermissionsExt;
353        std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
354    }
355    Ok(())
356}
357
358fn sign_nonce_with_ssh_key(nonce: &[u8], private_key: &Path) -> StorageResult<String> {
359    let tmp = tempfile::TempDir::new()
360        .map_err(|e| StorageError::Transport(format!("failed to create temp dir: {e}")))?;
361    let nonce_file = tmp.path().join("nonce");
362    std::fs::write(&nonce_file, nonce)
363        .map_err(|e| StorageError::Transport(format!("failed to write nonce: {e}")))?;
364
365    let status = Command::new("ssh-keygen")
366        .args([
367            "-Y",
368            "sign",
369            "-f",
370            private_key.to_str().unwrap_or(""),
371            "-n",
372            "vyn",
373            nonce_file.to_str().unwrap_or(""),
374        ])
375        .stdout(Stdio::null())
376        .stderr(Stdio::null())
377        .status()
378        .map_err(|e| StorageError::Transport(format!("failed to run ssh-keygen: {e}")))?;
379
380    if !status.success() {
381        return Err(StorageError::Transport("ssh-keygen signing failed".into()));
382    }
383
384    let sig_file = tmp.path().join("nonce.sig");
385    let sig = std::fs::read_to_string(&sig_file)
386        .map_err(|e| StorageError::Transport(format!("failed to read signature: {e}")))?;
387    Ok(sig)
388}
389
390#[cfg(test)]
391mod tests {
392    use crate::crypto::secret_bytes;
393    use crate::manifest::Manifest;
394    use crate::storage::{StorageProvider, decrypt_manifest, encrypt_manifest};
395
396    use super::RelayStorageProvider;
397
398    #[tokio::test]
399    async fn relay_roundtrip() {
400        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
401            .await
402            .expect("bind test port");
403        let port = listener.local_addr().expect("read local addr").port();
404
405        let data_dir = std::env::temp_dir().join(format!("vyn-relay-it-{}", uuid::Uuid::new_v4()));
406        let data_dir_string = data_dir.to_string_lossy().to_string();
407
408        let handle = tokio::spawn(async move {
409            let _ = vyn_relay::serve_with_listener(listener, data_dir_string).await;
410        });
411
412        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
413
414        let provider = RelayStorageProvider::new(format!("http://127.0.0.1:{port}"));
415        // Inject the test bypass token so write RPCs pass auth in test builds.
416        *provider.token.write().await = Some("test-bypass".to_string());
417
418        let mut uploaded = false;
419        for _ in 0..20 {
420            if provider
421                .upload_blob("blob123", b"hello-relay".to_vec())
422                .await
423                .is_ok()
424            {
425                uploaded = true;
426                break;
427            }
428            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
429        }
430        assert!(uploaded, "blob upload should succeed once relay is ready");
431        let blob = provider
432            .download_blob("blob123")
433            .await
434            .expect("blob download should succeed")
435            .expect("blob should exist");
436        assert_eq!(blob, b"hello-relay");
437
438        let key = secret_bytes(vec![3u8; 32]);
439        let manifest = Manifest {
440            version: 1,
441            files: vec![],
442        };
443        let payload = encrypt_manifest(&manifest, &key).expect("encrypt manifest");
444        provider
445            .put_manifest("vault-it", &payload)
446            .await
447            .expect("put manifest should succeed");
448        let pulled = provider
449            .get_manifest("vault-it")
450            .await
451            .expect("get manifest should succeed")
452            .expect("manifest should exist");
453        let restored = decrypt_manifest(&pulled, &key).expect("decrypt manifest");
454        assert_eq!(restored.version, 1);
455
456        provider
457            .create_invite("alice", "vault-it", b"invite-data".to_vec())
458            .await
459            .expect("create invite should succeed");
460        let invites = provider
461            .get_invites("alice", "vault-it")
462            .await
463            .expect("get invites should succeed");
464        assert_eq!(invites.len(), 1);
465        assert_eq!(invites[0], b"invite-data");
466
467        handle.abort();
468        let _ = std::fs::remove_dir_all(data_dir);
469    }
470}