aardvark_core/
invocation.rs

1//! Invocation descriptor and budget definitions.
2//!
3//! These structures allow hosts to describe the contract for a Python bundle
4//! without leaking platform-specific manifest details into the runtime. They
5//! intentionally stay lightweight so adapters can extend them with
6//! `metadata` fields as needed.
7
8use serde::{Deserialize, Serialize};
9use serde_json::Value as JsonValue;
10
11use crate::runtime_language::RuntimeLanguage;
12
13/// Describes the runtime contract for a single invocation.
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct InvocationDescriptor {
16    entrypoint: String,
17    /// Optional language override for the invocation.
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub language: Option<RuntimeLanguage>,
20    /// Inputs passed positionally to the Python handler.
21    #[serde(default)]
22    pub inputs: Vec<FieldDescriptor>,
23    /// Outputs captured from the handler.
24    #[serde(default)]
25    pub outputs: Vec<FieldDescriptor>,
26    /// Free-form JSON for adapters that need extra parameters.
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub params: Option<JsonValue>,
29    /// Optional rolling window configuration for stateful invocations.
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub window: Option<WindowConfig>,
32    /// Resource budgets applied to the invocation.
33    #[serde(default)]
34    pub limits: InvocationLimits,
35}
36
37impl InvocationDescriptor {
38    /// Create a descriptor with a fully-specified entrypoint and sensible defaults.
39    pub fn new(entrypoint: impl Into<String>) -> Self {
40        let entrypoint = entrypoint.into();
41        Self {
42            entrypoint: sanitize_entrypoint(entrypoint),
43            language: None,
44            inputs: Vec::new(),
45            outputs: Vec::new(),
46            params: None,
47            window: None,
48            limits: InvocationLimits::default(),
49        }
50    }
51
52    /// Convenience constructor for legacy usage – only the entrypoint is provided.
53    pub fn trivial(entrypoint: impl Into<String>) -> Self {
54        Self::new(entrypoint)
55    }
56
57    /// Returns the canonical entrypoint (module:function path or script).
58    pub fn entrypoint(&self) -> &str {
59        &self.entrypoint
60    }
61
62    /// Returns a descriptor clone with the provided limits applied.
63    pub fn with_limits(mut self, limits: InvocationLimits) -> Self {
64        self.limits = limits;
65        self
66    }
67
68    /// Ensure the descriptor is well-formed.
69    pub fn validate(&self) -> Result<(), DescriptorError> {
70        if self.entrypoint.trim().is_empty() {
71            return Err(DescriptorError::InvalidEntrypoint);
72        }
73        for field in self.inputs.iter().chain(self.outputs.iter()) {
74            field.validate()?;
75        }
76        self.limits.validate()?;
77        if let Some(window) = &self.window {
78            window.validate()?;
79        }
80        Ok(())
81    }
82}
83
84/// Simple descriptor for an input or output field.
85#[derive(Clone, Debug, Serialize, Deserialize)]
86pub struct FieldDescriptor {
87    /// Field name (also used for logging instrumentation).
88    pub name: String,
89    /// Optional type hint understood by host adapters.
90    #[serde(default, skip_serializing_if = "Option::is_none")]
91    pub type_tag: Option<String>,
92    /// Optional metadata, typically used for decoder configuration.
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub metadata: Option<JsonValue>,
95}
96
97impl FieldDescriptor {
98    fn validate(&self) -> Result<(), DescriptorError> {
99        if self.name.trim().is_empty() {
100            return Err(DescriptorError::InvalidFieldName);
101        }
102        Ok(())
103    }
104}
105
106/// Optional rolling window configuration for descriptor-aware invocations.
107#[derive(Clone, Debug, Default, Serialize, Deserialize)]
108pub struct WindowConfig {
109    /// Window size (number of events included per invocation).
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub size: Option<u64>,
112    /// Step between successive windows.
113    #[serde(default, skip_serializing_if = "Option::is_none")]
114    pub step: Option<u64>,
115    /// Optional stride to skip records within the window.
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    pub stride: Option<u64>,
118}
119
120impl WindowConfig {
121    fn validate(&self) -> Result<(), DescriptorError> {
122        for value in [self.size, self.step, self.stride].into_iter().flatten() {
123            if value == 0 {
124                return Err(DescriptorError::InvalidWindowConfig);
125            }
126        }
127        Ok(())
128    }
129}
130
131/// Execution budget configuration derived from the descriptor.
132#[derive(Clone, Debug, Default, Serialize, Deserialize)]
133pub struct InvocationLimits {
134    /// Maximum wall-clock time in milliseconds.
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub wall_ms: Option<u64>,
137    /// Maximum heap usage in MiB as reported by V8.
138    #[serde(default, skip_serializing_if = "Option::is_none")]
139    pub heap_mb: Option<u64>,
140    /// Maximum CPU time in milliseconds (per-thread).
141    #[serde(default, skip_serializing_if = "Option::is_none", alias = "cpu_fuel")]
142    pub cpu_ms: Option<u64>,
143}
144
145impl InvocationLimits {
146    fn validate(&self) -> Result<(), DescriptorError> {
147        if let Some(0) = self.wall_ms {
148            return Err(DescriptorError::InvalidLimits);
149        }
150        if let Some(0) = self.heap_mb {
151            return Err(DescriptorError::InvalidLimits);
152        }
153        if let Some(0) = self.cpu_ms {
154            return Err(DescriptorError::InvalidLimits);
155        }
156        Ok(())
157    }
158
159    /// Merge the descriptor limits with an optional override, picking the tighter budget.
160    pub fn merged_with(&self, override_limits: Option<&InvocationLimits>) -> InvocationLimits {
161        fn merge(primary: Option<u64>, override_value: Option<u64>) -> Option<u64> {
162            match (primary, override_value) {
163                (Some(a), Some(b)) => Some(a.min(b)),
164                (Some(a), None) => Some(a),
165                (None, Some(b)) => Some(b),
166                (None, None) => None,
167            }
168        }
169
170        if let Some(override_limits) = override_limits {
171            InvocationLimits {
172                wall_ms: merge(self.wall_ms, override_limits.wall_ms),
173                heap_mb: merge(self.heap_mb, override_limits.heap_mb),
174                cpu_ms: merge(self.cpu_ms, override_limits.cpu_ms),
175            }
176        } else {
177            self.clone()
178        }
179    }
180}
181
182/// Descriptor validation failures surfaced to callers.
183#[derive(Debug, thiserror::Error)]
184pub enum DescriptorError {
185    #[error("descriptor entrypoint cannot be empty")]
186    InvalidEntrypoint,
187    #[error("descriptor field name cannot be empty")]
188    InvalidFieldName,
189    #[error("descriptor window configuration cannot contain zero values")]
190    InvalidWindowConfig,
191    #[error("descriptor limits must be positive when specified")]
192    InvalidLimits,
193}
194
195fn sanitize_entrypoint(entrypoint: String) -> String {
196    entrypoint.trim().to_owned()
197}
198
199#[cfg(test)]
200mod tests {
201    use super::{FieldDescriptor, InvocationDescriptor, InvocationLimits};
202
203    #[test]
204    fn trivial_descriptor_sanitizes_entrypoint() {
205        let descriptor = InvocationDescriptor::trivial("  module:func  ");
206        assert_eq!(descriptor.entrypoint(), "module:func");
207        assert!(descriptor.validate().is_ok());
208    }
209
210    #[test]
211    fn validate_rejects_empty_entrypoint() {
212        let descriptor = InvocationDescriptor::trivial("   ");
213        assert!(descriptor.validate().is_err());
214    }
215
216    #[test]
217    fn validate_accepts_multiple_outputs() {
218        let mut descriptor = InvocationDescriptor::trivial("pkg:handler");
219        descriptor.outputs.push(FieldDescriptor {
220            name: "first".into(),
221            type_tag: None,
222            metadata: None,
223        });
224        descriptor.outputs.push(FieldDescriptor {
225            name: "second".into(),
226            type_tag: None,
227            metadata: None,
228        });
229        assert!(descriptor.validate().is_ok());
230    }
231
232    #[test]
233    fn limits_merge_prefers_tighter_budget() {
234        let base = InvocationLimits {
235            wall_ms: Some(2_000),
236            heap_mb: Some(512),
237            cpu_ms: None,
238        };
239        let override_limits = InvocationLimits {
240            wall_ms: Some(1_000),
241            heap_mb: Some(1_024),
242            cpu_ms: Some(10_000),
243        };
244        let merged = base.merged_with(Some(&override_limits));
245        assert_eq!(merged.wall_ms, Some(1_000));
246        assert_eq!(merged.heap_mb, Some(512));
247        assert_eq!(merged.cpu_ms, Some(10_000));
248    }
249}