posemesh_compute_node_runner_api/
runner.rs1use crate::types::LeaseEnvelope;
2use anyhow::Result;
3use async_trait::async_trait;
4use std::path::{Path, PathBuf};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct MaterializedInput {
9 pub cid: String,
10 pub path: PathBuf,
11 pub data_id: Option<String>,
12 pub name: Option<String>,
13 pub data_type: Option<String>,
14 pub domain_id: Option<String>,
15 pub root_dir: PathBuf,
16 pub related_files: Vec<PathBuf>,
17 pub extracted_paths: Vec<PathBuf>,
18}
19
20impl MaterializedInput {
21 pub fn new(cid: impl Into<String>, path: PathBuf) -> Self {
22 let root_dir = path
23 .parent()
24 .map(|p| p.to_path_buf())
25 .unwrap_or_else(|| path.clone());
26 Self {
27 cid: cid.into(),
28 path,
29 data_id: None,
30 name: None,
31 data_type: None,
32 domain_id: None,
33 root_dir,
34 related_files: Vec::new(),
35 extracted_paths: Vec::new(),
36 }
37 }
38}
39
40#[async_trait]
42pub trait InputSource: Send + Sync {
43 async fn get_bytes_by_cid(&self, cid: &str) -> Result<Vec<u8>>;
45
46 async fn materialize_cid_to_temp(&self, cid: &str) -> Result<std::path::PathBuf>;
48
49 async fn materialize_cid_with_meta(&self, cid: &str) -> Result<MaterializedInput> {
51 let path = self.materialize_cid_to_temp(cid).await?;
52 Ok(MaterializedInput::new(cid, path))
53 }
54}
55
56#[async_trait]
58pub trait ArtifactSink: Send + Sync {
59 async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()>;
61
62 async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()>;
64
65 async fn open_multipart(&self, _rel_path: &str) -> Result<Box<dyn MultipartUpload>> {
67 Err(anyhow::anyhow!("multipart not supported"))
68 }
69
70 async fn put_domain_artifact(
72 &self,
73 _request: DomainArtifactRequest<'_>,
74 ) -> Result<Option<String>> {
75 Err(anyhow::anyhow!("domain artifact uploads not supported"))
76 }
77}
78
79#[async_trait]
81pub trait MultipartUpload: Send + Sync {
82 async fn write_chunk(&mut self, chunk: &[u8]) -> Result<()>;
84 async fn finish(self: Box<Self>) -> Result<()>;
86}
87
88pub struct DomainArtifactRequest<'a> {
90 pub rel_path: &'a str,
91 pub name: &'a str,
92 pub data_type: &'a str,
93 pub existing_id: Option<&'a str>,
94 pub content: DomainArtifactContent<'a>,
95}
96
97pub enum DomainArtifactContent<'a> {
99 Bytes(&'a [u8]),
100 File(&'a Path),
101}
102
103#[async_trait]
105pub trait ControlPlane: Send + Sync {
106 async fn is_cancelled(&self) -> bool;
108
109 async fn progress(&self, value: serde_json::Value) -> Result<()>;
111
112 async fn log_event(&self, fields: serde_json::Value) -> Result<()>;
114}
115
116pub struct TaskCtx<'a> {
118 pub lease: &'a LeaseEnvelope,
119 pub input: &'a dyn InputSource,
120 pub output: &'a dyn ArtifactSink,
121 pub ctrl: &'a dyn ControlPlane,
122}
123
124#[async_trait]
126pub trait Runner: Send + Sync {
127 fn capability(&self) -> &'static str;
129
130 async fn run(&self, ctx: TaskCtx<'_>) -> Result<()>;
132}