posemesh_compute_node/storage/
input.rs1use super::client::DomainClient;
2use anyhow::{anyhow, Context, Result};
3use async_trait::async_trait;
4use tokio::fs;
5
6#[derive(Clone)]
8pub struct DomainInput {
9 client: DomainClient,
10}
11impl DomainInput {
12 pub fn new(client: DomainClient) -> Self {
13 Self { client }
14 }
15}
16
17#[async_trait]
18impl compute_runner_api::InputSource for DomainInput {
19 async fn get_bytes_by_cid(&self, cid: &str) -> Result<Vec<u8>> {
20 let materialized = self.materialize_cid_with_meta(cid).await?;
21 let source_path = materialized
22 .extracted_paths
23 .first()
24 .cloned()
25 .unwrap_or_else(|| materialized.path.clone());
26 let bytes = fs::read(&source_path)
27 .await
28 .with_context(|| format!("read domain download {}", source_path.display()))?;
29 Ok(bytes)
30 }
31
32 async fn materialize_cid_to_temp(&self, cid: &str) -> Result<std::path::PathBuf> {
33 let materialized = self.materialize_cid_with_meta(cid).await?;
34 Ok(materialized.path)
35 }
36
37 async fn materialize_cid_with_meta(
38 &self,
39 cid: &str,
40 ) -> Result<compute_runner_api::MaterializedInput> {
41 let mut parts = self
42 .client
43 .download_uri(cid)
44 .await
45 .map_err(|e| anyhow!(e))?;
46 if parts.is_empty() {
47 return Err(anyhow!("domain response missing data for {}", cid));
48 }
49 let primary_index = parts
50 .iter()
51 .position(|p| p.data_type.as_deref() == Some("refined_scan_zip"))
52 .unwrap_or(0);
53 let primary = parts.remove(primary_index);
54 let related_files = parts.into_iter().map(|p| p.path).collect();
55
56 Ok(compute_runner_api::MaterializedInput {
57 cid: cid.to_string(),
58 path: primary.path,
59 data_id: primary.id,
60 name: primary.name,
61 data_type: primary.data_type,
62 domain_id: primary.domain_id,
63 root_dir: primary.root,
64 related_files,
65 extracted_paths: primary.extracted_paths,
66 })
67 }
68}