posemesh_compute_node_runner_api/
runner.rs

1use crate::types::LeaseEnvelope;
2use anyhow::Result;
3use async_trait::async_trait;
4use std::path::{Path, PathBuf};
5
6/// Result of materializing a CID, including discovered metadata from the domain server.
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct MaterializedInput {
9    pub cid: String,
10    pub path: PathBuf,
11    pub data_id: Option<String>,
12    pub name: Option<String>,
13    pub data_type: Option<String>,
14    pub domain_id: Option<String>,
15    pub root_dir: PathBuf,
16    pub related_files: Vec<PathBuf>,
17    pub extracted_paths: Vec<PathBuf>,
18}
19
20impl MaterializedInput {
21    pub fn new(cid: impl Into<String>, path: PathBuf) -> Self {
22        let root_dir = path
23            .parent()
24            .map(|p| p.to_path_buf())
25            .unwrap_or_else(|| path.clone());
26        Self {
27            cid: cid.into(),
28            path,
29            data_id: None,
30            name: None,
31            data_type: None,
32            domain_id: None,
33            root_dir,
34            related_files: Vec::new(),
35            extracted_paths: Vec::new(),
36        }
37    }
38}
39
40/// Source of input artifacts for a task.
41#[async_trait]
42pub trait InputSource: Send + Sync {
43    /// Fetch object bytes by CID.
44    async fn get_bytes_by_cid(&self, cid: &str) -> Result<Vec<u8>>;
45
46    /// Materialize CID to a temporary file path and return its location.
47    async fn materialize_cid_to_temp(&self, cid: &str) -> Result<std::path::PathBuf>;
48
49    /// Materialize CID and include optional metadata describing the source artifact.
50    async fn materialize_cid_with_meta(&self, cid: &str) -> Result<MaterializedInput> {
51        let path = self.materialize_cid_to_temp(cid).await?;
52        Ok(MaterializedInput::new(cid, path))
53    }
54}
55
56/// Destination for task output artifacts.
57#[async_trait]
58pub trait ArtifactSink: Send + Sync {
59    /// Upload raw bytes under a relative path within the outputs prefix.
60    async fn put_bytes(&self, rel_path: &str, bytes: &[u8]) -> Result<()>;
61
62    /// Upload a file from the local filesystem under a relative path.
63    async fn put_file(&self, rel_path: &str, file_path: &std::path::Path) -> Result<()>;
64
65    /// Optional: open a multipart upload writer for a large artifact.
66    async fn open_multipart(&self, _rel_path: &str) -> Result<Box<dyn MultipartUpload>> {
67        Err(anyhow::anyhow!("multipart not supported"))
68    }
69
70    /// Upload an artifact with explicit Domain metadata.
71    async fn put_domain_artifact(
72        &self,
73        _request: DomainArtifactRequest<'_>,
74    ) -> Result<Option<String>> {
75        Err(anyhow::anyhow!("domain artifact uploads not supported"))
76    }
77}
78
79/// Handle returned by `ArtifactSink::open_multipart`.
80#[async_trait]
81pub trait MultipartUpload: Send + Sync {
82    /// Write a chunk of the artifact.
83    async fn write_chunk(&mut self, chunk: &[u8]) -> Result<()>;
84    /// Finish and commit the artifact.
85    async fn finish(self: Box<Self>) -> Result<()>;
86}
87
88/// Metadata required to upload an artifact with Domain semantics.
89pub struct DomainArtifactRequest<'a> {
90    pub rel_path: &'a str,
91    pub name: &'a str,
92    pub data_type: &'a str,
93    pub existing_id: Option<&'a str>,
94    pub content: DomainArtifactContent<'a>,
95}
96
97/// Describes the content of a `DomainArtifactRequest`.
98pub enum DomainArtifactContent<'a> {
99    Bytes(&'a [u8]),
100    File(&'a Path),
101}
102
103/// Control-plane interface for a running task.
104#[async_trait]
105pub trait ControlPlane: Send + Sync {
106    /// Returns true if the task has been cancelled.
107    async fn is_cancelled(&self) -> bool;
108
109    /// Report progress to the engine; opaque JSON accepted to avoid coupling.
110    async fn progress(&self, value: serde_json::Value) -> Result<()>;
111
112    /// Log an event with fields to be attached to heartbeats.
113    async fn log_event(&self, fields: serde_json::Value) -> Result<()>;
114}
115
116/// Task context passed to runners.
117pub struct TaskCtx<'a> {
118    pub lease: &'a LeaseEnvelope,
119    pub input: &'a dyn InputSource,
120    pub output: &'a dyn ArtifactSink,
121    pub ctrl: &'a dyn ControlPlane,
122}
123
124/// Runner entrypoint.
125#[async_trait]
126pub trait Runner: Send + Sync {
127    /// Capability string this runner implements (e.g., "/reconstruction/legacy/v1").
128    fn capability(&self) -> &'static str;
129
130    /// Execute the task.
131    async fn run(&self, ctx: TaskCtx<'_>) -> Result<()>;
132}