posemesh_compute_node/storage/
input.rs1use super::client::DomainClient;
2use anyhow::{anyhow, Context, Result};
3use async_trait::async_trait;
4use tokio::fs;
5
6#[derive(Clone)]
8pub struct DomainInput {
9 client: DomainClient,
10 domain_id: String,
11}
12impl DomainInput {
13 pub fn new(client: DomainClient, domain_id: String) -> Self {
14 Self { client, domain_id }
15 }
16}
17
18#[async_trait]
19impl compute_runner_api::InputSource for DomainInput {
20 async fn get_bytes_by_cid(&self, cid: &str) -> Result<Vec<u8>> {
21 let materialized = self.materialize_cid_with_meta(cid).await?;
22 let source_path = materialized
23 .extracted_paths
24 .first()
25 .cloned()
26 .unwrap_or_else(|| materialized.path.clone());
27 let bytes = fs::read(&source_path)
28 .await
29 .with_context(|| format!("read domain download {}", source_path.display()))?;
30 Ok(bytes)
31 }
32
33 async fn materialize_cid_to_temp(&self, cid: &str) -> Result<std::path::PathBuf> {
34 let materialized = self.materialize_cid_with_meta(cid).await?;
35 Ok(materialized.path)
36 }
37
38 async fn materialize_cid_with_meta(
39 &self,
40 cid: &str,
41 ) -> Result<compute_runner_api::MaterializedInput> {
42 let mut parts = self
43 .client
44 .download_cid(&self.domain_id, cid)
45 .await
46 .map_err(|e| anyhow!(e))?;
47 if parts.is_empty() {
48 return Err(anyhow!("domain response missing data for {}", cid));
49 }
50 let primary = parts.remove(0);
53 let related_files = parts.into_iter().map(|p| p.path).collect();
54
55 Ok(compute_runner_api::MaterializedInput {
56 cid: cid.to_string(),
57 path: primary.path,
58 data_id: primary.id,
59 name: primary.name,
60 data_type: primary.data_type,
61 domain_id: primary.domain_id,
62 root_dir: primary.root,
63 related_files,
64 extracted_paths: primary.extracted_paths,
65 })
66 }
67}