Skip to main content

vyn_core/
relay_storage.rs

1use std::path::Path;
2use std::process::{Command, Stdio};
3use std::sync::Arc;
4
5use tonic::Request;
6use tonic::metadata::MetadataValue;
7use tonic::transport::{Channel, Endpoint};
8
9use crate::storage::{StorageError, StorageProvider, StorageResult};
10
11#[derive(Clone)]
12pub struct RelayStorageProvider {
13    endpoint: String,
14    pub token: Arc<tokio::sync::RwLock<Option<String>>>,
15}
16
17impl RelayStorageProvider {
18    pub fn new(endpoint: impl Into<String>) -> Self {
19        Self {
20            endpoint: endpoint.into(),
21            token: Arc::new(tokio::sync::RwLock::new(None)),
22        }
23    }
24
25    async fn connect(
26        &self,
27    ) -> StorageResult<vyn_relay::proto::vyn_relay_client::VynRelayClient<Channel>> {
28        let endpoint = Endpoint::from_shared(self.endpoint.clone())
29            .map_err(|err| StorageError::Transport(err.to_string()))?;
30        vyn_relay::proto::vyn_relay_client::VynRelayClient::connect(endpoint)
31            .await
32            .map_err(|err| StorageError::Transport(err.to_string()))
33    }
34
35    /// Perform the two-step challenge-response auth and store the session token.
36    pub async fn authenticate(
37        &self,
38        user_id: &str,
39        sign_fn: impl Fn(&[u8]) -> StorageResult<String>,
40    ) -> StorageResult<()> {
41        let mut client = self.connect().await?;
42
43        // Step 1: request challenge
44        let challenge_resp = client
45            .authenticate(Request::new(vyn_relay::proto::AuthRequest {
46                user_id: user_id.to_string(),
47                nonce: Vec::new(),
48                signature: String::new(),
49            }))
50            .await
51            .map_err(|e| StorageError::Transport(e.to_string()))?
52            .into_inner();
53
54        let nonce = challenge_resp.challenge_nonce;
55
56        // Step 2: sign nonce and send back
57        let signature = sign_fn(&nonce)?;
58        let auth_resp = client
59            .authenticate(Request::new(vyn_relay::proto::AuthRequest {
60                user_id: user_id.to_string(),
61                nonce: nonce.clone(),
62                signature,
63            }))
64            .await
65            .map_err(|e| StorageError::Transport(e.to_string()))?
66            .into_inner();
67
68        if !auth_resp.authenticated {
69            return Err(StorageError::Transport(
70                "relay authentication failed".to_string(),
71            ));
72        }
73
74        *self.token.write().await = Some(auth_resp.token);
75        Ok(())
76    }
77
78    /// Register identity on the relay (idempotent) then authenticate.
79    /// `vault_dir` is the `.vyn/` directory (contains `identity.toml`).
80    pub async fn authenticate_with_identity(&self, vault_dir: &Path) -> StorageResult<()> {
81        let identity = load_identity(vault_dir)?;
82        let private_key_path = identity.ssh_private_key.clone();
83        let public_key_path = identity.ssh_public_key.clone();
84        let user_id = identity.github_username.clone();
85
86        let public_key = std::fs::read_to_string(&public_key_path)
87            .map_err(|e| StorageError::Transport(format!("failed to read public key: {e}")))?;
88        let public_key = public_key.trim().to_string();
89
90        // Register identity on the relay (no-op if already registered with same key)
91        self.ensure_identity_registered(&user_id, &public_key, &private_key_path)
92            .await?;
93
94        let private_key_path2 = private_key_path.clone();
95        self.authenticate(&user_id, move |nonce| {
96            sign_nonce_with_ssh_key(nonce, Path::new(&private_key_path2))
97        })
98        .await
99    }
100
101    async fn ensure_identity_registered(
102        &self,
103        user_id: &str,
104        public_key: &str,
105        private_key_path: &str,
106    ) -> StorageResult<()> {
107        let registration_payload = format!("vyn-register:{user_id}:{public_key}");
108        let signature =
109            sign_nonce_with_ssh_key(registration_payload.as_bytes(), Path::new(private_key_path))?;
110
111        let mut client = self.connect().await?;
112        client
113            .register_identity(Request::new(vyn_relay::proto::RegisterRequest {
114                user_id: user_id.to_string(),
115                public_key: public_key.to_string(),
116                signature,
117            }))
118            .await
119            .map_err(|e| StorageError::Transport(e.to_string()))?;
120
121        Ok(())
122    }
123
124    async fn inject_token<T>(&self, mut request: Request<T>) -> StorageResult<Request<T>> {
125        if let Some(ref tok) = *self.token.read().await {
126            let val = MetadataValue::try_from(tok.as_str())
127                .map_err(|e| StorageError::Transport(e.to_string()))?;
128            request.metadata_mut().insert("x-vyn-token", val);
129        }
130        Ok(request)
131    }
132}
133
134impl StorageProvider for RelayStorageProvider {
135    async fn get_manifest(&self, project_id: &str) -> StorageResult<Option<Vec<u8>>> {
136        let mut client = self.connect().await?;
137        let response = client
138            .get_manifest(Request::new(vyn_relay::proto::GetManifestRequest {
139                project_id: project_id.to_string(),
140            }))
141            .await
142            .map_err(|err| StorageError::Transport(err.to_string()))?
143            .into_inner();
144
145        if response.found {
146            Ok(Some(response.payload))
147        } else {
148            Ok(None)
149        }
150    }
151
152    async fn put_manifest(&self, project_id: &str, manifest_payload: &[u8]) -> StorageResult<()> {
153        let mut client = self.connect().await?;
154        let req = self
155            .inject_token(Request::new(vyn_relay::proto::PutManifestRequest {
156                project_id: project_id.to_string(),
157                payload: manifest_payload.to_vec(),
158            }))
159            .await?;
160        client
161            .put_manifest(req)
162            .await
163            .map_err(|err| StorageError::Transport(err.to_string()))?;
164        Ok(())
165    }
166
167    async fn upload_blob(&self, hash: &str, data: Vec<u8>) -> StorageResult<()> {
168        let mut client = self.connect().await?;
169        let message = vyn_relay::proto::UploadBlobChunk {
170            hash: hash.to_string(),
171            chunk: data,
172        };
173
174        let mut req = Request::new(tokio_stream::iter(vec![message]));
175        if let Some(ref tok) = *self.token.read().await {
176            let val = MetadataValue::try_from(tok.as_str())
177                .map_err(|e| StorageError::Transport(e.to_string()))?;
178            req.metadata_mut().insert("x-vyn-token", val);
179        }
180        client
181            .upload_blob(req)
182            .await
183            .map_err(|err| StorageError::Transport(err.to_string()))?;
184        Ok(())
185    }
186
187    async fn download_blob(&self, hash: &str) -> StorageResult<Option<Vec<u8>>> {
188        let mut client = self.connect().await?;
189        let stream = client
190            .download_blob(Request::new(vyn_relay::proto::DownloadBlobRequest {
191                hash: hash.to_string(),
192            }))
193            .await
194            .map_err(|err| {
195                if err.code() == tonic::Code::NotFound {
196                    StorageError::NotFound
197                } else {
198                    StorageError::Transport(err.to_string())
199                }
200            });
201
202        if let Err(StorageError::NotFound) = stream {
203            return Ok(None);
204        }
205
206        let mut stream = stream?.into_inner();
207
208        let mut out = Vec::new();
209        while let Some(chunk) = stream
210            .message()
211            .await
212            .map_err(|err| StorageError::Transport(err.to_string()))?
213        {
214            out.extend_from_slice(&chunk.chunk);
215        }
216
217        Ok(Some(out))
218    }
219
220    async fn create_invite(
221        &self,
222        user_id: &str,
223        vault_id: &str,
224        payload: Vec<u8>,
225    ) -> StorageResult<()> {
226        let mut client = self.connect().await?;
227        let req = self
228            .inject_token(Request::new(vyn_relay::proto::CreateInviteRequest {
229                user_id: user_id.to_string(),
230                vault_id: vault_id.to_string(),
231                payload,
232            }))
233            .await?;
234        client
235            .create_invite(req)
236            .await
237            .map_err(|err| StorageError::Transport(err.to_string()))?;
238        Ok(())
239    }
240
241    async fn get_invites(&self, user_id: &str, vault_id: &str) -> StorageResult<Vec<Vec<u8>>> {
242        let mut client = self.connect().await?;
243        let response = client
244            .get_invites(Request::new(vyn_relay::proto::GetInvitesRequest {
245                user_id: user_id.to_string(),
246                vault_id: vault_id.to_string(),
247            }))
248            .await
249            .map_err(|err| StorageError::Transport(err.to_string()))?
250            .into_inner();
251        Ok(response.payloads)
252    }
253}
254
255impl RelayStorageProvider {
256    /// Lists all vault IDs on the relay (requires auth token).
257    pub async fn list_vaults(&self) -> StorageResult<Vec<String>> {
258        let mut client = self.connect().await?;
259        let mut request = tonic::Request::new(vyn_relay::proto::ListVaultsRequest {});
260        request = self.inject_token(request).await?;
261        let response = client
262            .list_vaults(request)
263            .await
264            .map_err(|err| StorageError::Transport(err.to_string()))?
265            .into_inner();
266        Ok(response.vault_ids)
267    }
268
269    /// Lists all blobs on the relay (requires auth token).
270    pub async fn list_blobs(&self) -> StorageResult<Vec<(String, u64)>> {
271        let mut client = self.connect().await?;
272        let mut request = tonic::Request::new(vyn_relay::proto::ListBlobsRequest {
273            vault_id: String::new(),
274        });
275        request = self.inject_token(request).await?;
276        let response = client
277            .list_blobs(request)
278            .await
279            .map_err(|err| StorageError::Transport(err.to_string()))?
280            .into_inner();
281        Ok(response
282            .blobs
283            .into_iter()
284            .map(|b| (b.sha256, b.size_bytes))
285            .collect())
286    }
287}
288
289struct IdentityConfig {
290    github_username: String,
291    ssh_private_key: String,
292    ssh_public_key: String,
293}
294
295fn load_identity(vault_dir: &Path) -> StorageResult<IdentityConfig> {
296    let path = vault_dir.join("identity.toml");
297    let text = std::fs::read_to_string(&path)
298        .map_err(|e| StorageError::Transport(format!("failed to read identity.toml: {e}")))?;
299    let github_username = parse_toml_string(&text, "github_username").ok_or_else(|| {
300        StorageError::Transport("missing github_username in identity.toml".into())
301    })?;
302    let ssh_private_key = parse_toml_string(&text, "ssh_private_key").ok_or_else(|| {
303        StorageError::Transport("missing ssh_private_key in identity.toml".into())
304    })?;
305    let ssh_public_key = parse_toml_string(&text, "ssh_public_key")
306        .ok_or_else(|| StorageError::Transport("missing ssh_public_key in identity.toml".into()))?;
307    Ok(IdentityConfig {
308        github_username,
309        ssh_private_key,
310        ssh_public_key,
311    })
312}
313
314fn parse_toml_string(text: &str, key: &str) -> Option<String> {
315    for line in text.lines() {
316        let line = line.trim();
317        if let Some(rest) = line.strip_prefix(key) {
318            let rest = rest.trim().strip_prefix('=')?;
319            let val = rest.trim().trim_matches('"');
320            return Some(val.to_string());
321        }
322    }
323    None
324}
325
326fn sign_nonce_with_ssh_key(nonce: &[u8], private_key: &Path) -> StorageResult<String> {
327    let tmp = tempfile::TempDir::new()
328        .map_err(|e| StorageError::Transport(format!("failed to create temp dir: {e}")))?;
329    let nonce_file = tmp.path().join("nonce");
330    std::fs::write(&nonce_file, nonce)
331        .map_err(|e| StorageError::Transport(format!("failed to write nonce: {e}")))?;
332
333    let status = Command::new("ssh-keygen")
334        .args([
335            "-Y",
336            "sign",
337            "-f",
338            private_key.to_str().unwrap_or(""),
339            "-n",
340            "vyn",
341            nonce_file.to_str().unwrap_or(""),
342        ])
343        .stdout(Stdio::null())
344        .stderr(Stdio::null())
345        .status()
346        .map_err(|e| StorageError::Transport(format!("failed to run ssh-keygen: {e}")))?;
347
348    if !status.success() {
349        return Err(StorageError::Transport("ssh-keygen signing failed".into()));
350    }
351
352    let sig_file = tmp.path().join("nonce.sig");
353    let sig = std::fs::read_to_string(&sig_file)
354        .map_err(|e| StorageError::Transport(format!("failed to read signature: {e}")))?;
355    Ok(sig)
356}
357
358#[cfg(test)]
359mod tests {
360    use crate::crypto::secret_bytes;
361    use crate::manifest::Manifest;
362    use crate::storage::{StorageProvider, decrypt_manifest, encrypt_manifest};
363
364    use super::RelayStorageProvider;
365
366    #[tokio::test]
367    async fn relay_roundtrip() {
368        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
369            .await
370            .expect("bind test port");
371        let port = listener.local_addr().expect("read local addr").port();
372
373        let data_dir = std::env::temp_dir().join(format!("vyn-relay-it-{}", uuid::Uuid::new_v4()));
374        let data_dir_string = data_dir.to_string_lossy().to_string();
375
376        let handle = tokio::spawn(async move {
377            let _ = vyn_relay::serve_with_listener(listener, data_dir_string).await;
378        });
379
380        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
381
382        let provider = RelayStorageProvider::new(format!("http://127.0.0.1:{port}"));
383        // Inject the test bypass token so write RPCs pass auth in test builds.
384        *provider.token.write().await = Some("test-bypass".to_string());
385
386        let mut uploaded = false;
387        for _ in 0..20 {
388            if provider
389                .upload_blob("blob123", b"hello-relay".to_vec())
390                .await
391                .is_ok()
392            {
393                uploaded = true;
394                break;
395            }
396            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
397        }
398        assert!(uploaded, "blob upload should succeed once relay is ready");
399        let blob = provider
400            .download_blob("blob123")
401            .await
402            .expect("blob download should succeed")
403            .expect("blob should exist");
404        assert_eq!(blob, b"hello-relay");
405
406        let key = secret_bytes(vec![3u8; 32]);
407        let manifest = Manifest {
408            version: 1,
409            files: vec![],
410        };
411        let payload = encrypt_manifest(&manifest, &key).expect("encrypt manifest");
412        provider
413            .put_manifest("vault-it", &payload)
414            .await
415            .expect("put manifest should succeed");
416        let pulled = provider
417            .get_manifest("vault-it")
418            .await
419            .expect("get manifest should succeed")
420            .expect("manifest should exist");
421        let restored = decrypt_manifest(&pulled, &key).expect("decrypt manifest");
422        assert_eq!(restored.version, 1);
423
424        provider
425            .create_invite("alice", "vault-it", b"invite-data".to_vec())
426            .await
427            .expect("create invite should succeed");
428        let invites = provider
429            .get_invites("alice", "vault-it")
430            .await
431            .expect("get invites should succeed");
432        assert_eq!(invites.len(), 1);
433        assert_eq!(invites[0], b"invite-data");
434
435        handle.abort();
436        let _ = std::fs::remove_dir_all(data_dir);
437    }
438}