Skip to main content

greentic_runner_host/runner/
operator.rs

1use axum::{
2    body::{Body, to_bytes},
3    http::{HeaderMap, Response, StatusCode},
4};
5use serde::{Deserialize, Serialize};
6use serde_cbor;
7use serde_json::{Map, Value, json};
8use std::sync::atomic::Ordering;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use tracing::{Level, span};
12
13use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
14use crate::operator_registry::OperatorResolveError;
15use crate::provider::ProviderBinding;
16use crate::routing::TenantRuntimeHandle;
17use crate::runtime::TenantRuntime;
18
19const CONTENT_TYPE_CBOR: &str = "application/cbor";
20const OPERATOR_BODY_LIMIT: usize = 16 * 1024 * 1024;
21
22/// Operator-facing invocation payload (CBOR envelope).
23#[derive(Debug, Deserialize)]
24pub struct OperatorRequest {
25    #[serde(default)]
26    pub tenant_id: Option<String>,
27    #[serde(default)]
28    pub provider_id: Option<String>,
29    #[serde(default)]
30    pub provider_type: Option<String>,
31    #[serde(default)]
32    pub pack_id: Option<String>,
33    pub op_id: String,
34    #[serde(default)]
35    pub trace_id: Option<String>,
36    #[serde(default)]
37    pub correlation_id: Option<String>,
38    #[serde(default)]
39    pub timeout: Option<u64>,
40    #[serde(default)]
41    pub flags: Vec<String>,
42    #[serde(default)]
43    pub op_version: Option<String>,
44    #[serde(default)]
45    pub schema_hash: Option<String>,
46    pub payload: OperatorPayload,
47}
48
49impl OperatorRequest {
50    pub fn from_cbor(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
51        serde_cbor::from_slice(bytes)
52    }
53}
54
55#[derive(Debug, Deserialize)]
56pub struct OperatorPayload {
57    #[serde(default)]
58    #[serde(rename = "cbor_input")]
59    pub cbor_input: Vec<u8>,
60    #[serde(default)]
61    pub attachments: Vec<AttachmentRef>,
62}
63
64#[derive(Debug, Deserialize)]
65pub struct AttachmentRef {
66    pub id: String,
67    #[serde(default)]
68    pub metadata: Option<Value>,
69}
70
71/// Operator response envelope serialized back to CBOR.
72#[derive(Debug, Serialize)]
73pub struct OperatorResponse {
74    pub status: OperatorStatus,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub cbor_output: Option<Vec<u8>>,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub error: Option<OperatorError>,
79}
80
81impl OperatorResponse {
82    pub fn ok(output: Vec<u8>) -> Self {
83        Self {
84            status: OperatorStatus::Ok,
85            cbor_output: Some(output),
86            error: None,
87        }
88    }
89
90    pub fn error(code: OperatorErrorCode, message: impl Into<String>) -> Self {
91        Self {
92            status: OperatorStatus::Error,
93            cbor_output: None,
94            error: Some(OperatorError {
95                code,
96                message: message.into(),
97                details_cbor: None,
98            }),
99        }
100    }
101
102    pub fn to_cbor(&self) -> Result<Vec<u8>, serde_cbor::Error> {
103        serde_cbor::ser::to_vec_packed(self)
104    }
105}
106
107#[derive(Debug, Serialize)]
108pub struct OperatorError {
109    pub code: OperatorErrorCode,
110    pub message: String,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub details_cbor: Option<Vec<u8>>,
113}
114
115#[derive(Debug, Serialize)]
116pub enum OperatorStatus {
117    Ok,
118    Error,
119}
120
121#[derive(Debug, Serialize)]
122#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
123pub enum OperatorErrorCode {
124    OpNotFound,
125    ProviderNotFound,
126    TenantNotAllowed,
127    InvalidRequest,
128    CborDecode,
129    TypeMismatch,
130    ComponentLoad,
131    InvokeTrap,
132    Timeout,
133    PolicyDenied,
134    HostFailure,
135}
136
137impl OperatorErrorCode {
138    pub fn reason(&self) -> &'static str {
139        match self {
140            OperatorErrorCode::OpNotFound => "op not found",
141            OperatorErrorCode::ProviderNotFound => "provider not found",
142            OperatorErrorCode::TenantNotAllowed => "tenant not allowed",
143            OperatorErrorCode::InvalidRequest => "invalid operator request",
144            OperatorErrorCode::CborDecode => "failed to decode CBOR payload",
145            OperatorErrorCode::TypeMismatch => "type mismatch between CBOR and operation",
146            OperatorErrorCode::ComponentLoad => "failed to load component",
147            OperatorErrorCode::InvokeTrap => "component trapped during invoke",
148            OperatorErrorCode::Timeout => "invocation timed out",
149            OperatorErrorCode::PolicyDenied => "policy denied the operation",
150            OperatorErrorCode::HostFailure => "internal host failure",
151        }
152    }
153}
154
155/// Invoke an operator request without assuming HTTP transport.
156pub async fn invoke_operator(
157    runtime: &TenantRuntime,
158    request: OperatorRequest,
159) -> OperatorResponse {
160    if let Some(request_tenant) = request.tenant_id.as_deref()
161        && request_tenant != runtime.tenant()
162    {
163        return OperatorResponse::error(
164            OperatorErrorCode::TenantNotAllowed,
165            format!(
166                "tenant mismatch: routing resolved `{}` but request wants `{request_tenant}`",
167                runtime.tenant(),
168            ),
169        );
170    }
171
172    if request.provider_id.is_none() && request.provider_type.is_none() {
173        return OperatorResponse::error(
174            OperatorErrorCode::InvalidRequest,
175            "operator invoke requires provider_id or provider_type".to_string(),
176        );
177    }
178
179    let tenant = runtime.tenant();
180    let root_span = span!(
181        Level::INFO,
182        "operator.invoke",
183        tenant = %tenant,
184        op_id = %request.op_id,
185        provider_id = ?request.provider_id,
186        provider_type = ?request.provider_type
187    );
188    let _root_guard = root_span.enter();
189
190    let provider_id = request.provider_id.as_deref();
191    let provider_type = request.provider_type.as_deref();
192    runtime
193        .operator_metrics()
194        .resolve_attempts
195        .fetch_add(1, Ordering::Relaxed);
196    let resolve_span = span!(Level::DEBUG, "resolve_op");
197    let _resolve_guard = resolve_span.enter();
198    let binding =
199        match runtime
200            .operator_registry()
201            .resolve(provider_id, provider_type, &request.op_id)
202        {
203            Ok(binding) => binding,
204            Err(err) => {
205                let (code, message) = match err {
206                    OperatorResolveError::ProviderNotFound => {
207                        let label = provider_id.or(provider_type).unwrap_or("unknown");
208                        (
209                            OperatorErrorCode::ProviderNotFound,
210                            format!("provider `{label}` not registered"),
211                        )
212                    }
213                    OperatorResolveError::OpNotFound => {
214                        let label = provider_id.or(provider_type).unwrap_or("unknown provider");
215                        (
216                            OperatorErrorCode::OpNotFound,
217                            format!("op `{}` not found for provider `{label}`", &request.op_id),
218                        )
219                    }
220                };
221                runtime
222                    .operator_metrics()
223                    .resolve_errors
224                    .fetch_add(1, Ordering::Relaxed);
225                let response = OperatorResponse::error(code, message);
226                return response;
227            }
228        };
229    drop(_resolve_guard);
230
231    let policy = &runtime.config().operator_policy;
232    if !policy.allows_provider(provider_id, binding.provider_type.as_str()) {
233        return OperatorResponse::error(
234            OperatorErrorCode::PolicyDenied,
235            format!(
236                "provider `{}` not allowed for tenant {}",
237                binding
238                    .provider_id
239                    .as_deref()
240                    .unwrap_or(&binding.provider_type),
241                runtime.config().tenant
242            ),
243        );
244    }
245    if !policy.allows_op(provider_id, binding.provider_type.as_str(), &binding.op_id) {
246        return OperatorResponse::error(
247            OperatorErrorCode::PolicyDenied,
248            format!(
249                "op `{}` is not permitted for provider `{}` on tenant {}",
250                binding.op_id,
251                binding
252                    .provider_id
253                    .as_deref()
254                    .unwrap_or(&binding.provider_type),
255                runtime.config().tenant
256            ),
257        );
258    }
259
260    if let Some(req_pack) = request.pack_id.as_deref() {
261        let binding_pack = binding
262            .pack_ref
263            .split('@')
264            .next()
265            .unwrap_or(&binding.pack_ref);
266        if binding_pack != req_pack {
267            return OperatorResponse::error(
268                OperatorErrorCode::PolicyDenied,
269                format!(
270                    "request bound to pack `{req_pack}`, but op lives in `{}`",
271                    binding.pack_ref
272                ),
273            );
274        }
275    }
276
277    let attachments = match resolve_attachments(&request.payload, runtime) {
278        Ok(map) => map,
279        Err(response) => return response,
280    };
281
282    let decode_span = span!(Level::DEBUG, "decode_cbor");
283    let _decode_guard = decode_span.enter();
284    let input_value = match decode_request_payload(&request.payload.cbor_input) {
285        Ok(value) => value,
286        Err(err) => {
287            runtime
288                .operator_metrics()
289                .cbor_decode_errors
290                .fetch_add(1, Ordering::Relaxed);
291            return OperatorResponse::error(OperatorErrorCode::CborDecode, format!("{err}"));
292        }
293    };
294    drop(_decode_guard);
295
296    let input_value = merge_input_with_attachments(input_value, attachments);
297
298    let input_json = match serde_json::to_string(&input_value) {
299        Ok(json) => json,
300        Err(err) => {
301            return OperatorResponse::error(
302                OperatorErrorCode::TypeMismatch,
303                format!("failed to serialise input JSON: {err}"),
304            );
305        }
306    };
307
308    let component_ref = &binding.runtime.component_ref;
309    let pack = match runtime.pack_for_component(component_ref) {
310        Some(pack) => pack,
311        None => {
312            return OperatorResponse::error(
313                OperatorErrorCode::ComponentLoad,
314                format!("component `{}` not found in tenant packs", component_ref),
315            );
316        }
317    };
318
319    let exec_ctx = build_exec_ctx(&request, runtime);
320    runtime
321        .operator_metrics()
322        .invoke_attempts
323        .fetch_add(1, Ordering::Relaxed);
324    let invoke_span = span!(Level::INFO, "invoke_component", component = %component_ref);
325    let _invoke_guard = invoke_span.enter();
326    let result = if binding.runtime.world.starts_with("greentic:provider-core") {
327        let input_bytes = input_json.clone().into_bytes();
328        let provider_binding = ProviderBinding {
329            provider_id: binding.provider_id.clone(),
330            provider_type: binding.provider_type.clone(),
331            component_ref: binding.runtime.component_ref.clone(),
332            export: binding.runtime.export.clone(),
333            world: binding.runtime.world.clone(),
334            config_json: None,
335            pack_ref: Some(binding.pack_ref.clone()),
336        };
337        match pack
338            .invoke_provider(&provider_binding, exec_ctx, &binding.op_id, input_bytes)
339            .await
340        {
341            Ok(value) => value,
342            Err(err) => {
343                runtime
344                    .operator_metrics()
345                    .invoke_errors
346                    .fetch_add(1, Ordering::Relaxed);
347                return OperatorResponse::error(
348                    OperatorErrorCode::HostFailure,
349                    format!("provider invoke failed: {err}"),
350                );
351            }
352        }
353    } else {
354        match pack
355            .invoke_component(
356                component_ref,
357                exec_ctx,
358                &binding.op_id,
359                None,
360                input_json.clone(),
361            )
362            .await
363        {
364            Ok(value) => value,
365            Err(err) => {
366                runtime
367                    .operator_metrics()
368                    .invoke_errors
369                    .fetch_add(1, Ordering::Relaxed);
370                return OperatorResponse::error(
371                    OperatorErrorCode::HostFailure,
372                    format!("component invoke failed: {err}"),
373                );
374            }
375        }
376    };
377    drop(_invoke_guard);
378
379    let encode_span = span!(Level::DEBUG, "encode_cbor");
380    let _encode_guard = encode_span.enter();
381    let output_bytes = match serde_cbor::to_vec(&result) {
382        Ok(bytes) => bytes,
383        Err(err) => {
384            return OperatorResponse::error(
385                OperatorErrorCode::HostFailure,
386                format!("failed to encode CBOR output: {err}"),
387            );
388        }
389    };
390    drop(_encode_guard);
391
392    OperatorResponse::ok(output_bytes)
393}
394
395/// Convenience helper that takes CBOR bytes and reuses `invoke_operator`.
396pub async fn invoke_operator_cbor(
397    runtime: &TenantRuntime,
398    req_cbor: &[u8],
399) -> Result<Vec<u8>, serde_cbor::Error> {
400    let request = OperatorRequest::from_cbor(req_cbor)?;
401    let response = invoke_operator(runtime, request).await;
402    response.to_cbor()
403}
404
405/// Axum handler stub for `/operator/op/invoke`.
406pub async fn invoke(
407    TenantRuntimeHandle { runtime, .. }: TenantRuntimeHandle,
408    _headers: HeaderMap,
409    body: Body,
410) -> Result<Response<Body>, Response<Body>> {
411    let bytes = match to_bytes(body, OPERATOR_BODY_LIMIT).await {
412        Ok(bytes) => bytes,
413        Err(err) => {
414            return Err(bad_request(format!("failed to read body: {err}")));
415        }
416    };
417
418    let request = match OperatorRequest::from_cbor(&bytes) {
419        Ok(request) => request,
420        Err(err) => {
421            return Err(bad_request(format!("failed to decode request CBOR: {err}")));
422        }
423    };
424
425    let response = invoke_operator(&runtime, request).await;
426    build_cbor_response(response)
427}
428
429fn bad_request(message: String) -> Response<Body> {
430    let payload = json!({ "error": message });
431    Response::builder()
432        .status(StatusCode::BAD_REQUEST)
433        .header("content-type", "application/json")
434        .body(Body::from(payload.to_string()))
435        .expect("building JSON error response must succeed")
436}
437
438#[allow(clippy::result_large_err)]
439fn build_cbor_response(response: OperatorResponse) -> Result<Response<Body>, Response<Body>> {
440    match response.to_cbor() {
441        Ok(bytes) => Ok(Response::builder()
442            .status(StatusCode::OK)
443            .header("content-type", CONTENT_TYPE_CBOR)
444            .body(Body::from(bytes))
445            .expect("building CBOR response must succeed")),
446        Err(err) => Err(bad_request(format!(
447            "failed to serialize response CBOR: {err}"
448        ))),
449    }
450}
451
452fn decode_request_payload(bytes: &[u8]) -> Result<Value, serde_cbor::Error> {
453    if bytes.is_empty() {
454        return Ok(Value::Null);
455    }
456    serde_cbor::from_slice(bytes)
457}
458
459fn build_exec_ctx(request: &OperatorRequest, runtime: &TenantRuntime) -> ComponentExecCtx {
460    let deadline_unix_ms = request.timeout.and_then(|timeout_ms| {
461        SystemTime::now()
462            .checked_add(Duration::from_millis(timeout_ms))
463            .and_then(|deadline| deadline.duration_since(UNIX_EPOCH).ok())
464            .map(|duration| duration.as_millis() as u64)
465    });
466
467    let tenant_ctx = ComponentTenantCtx {
468        tenant: runtime.config().tenant.clone(),
469        team: None,
470        user: None,
471        trace_id: request.trace_id.clone(),
472        correlation_id: request.correlation_id.clone(),
473        deadline_unix_ms,
474        attempt: 1,
475        idempotency_key: request.correlation_id.clone(),
476    };
477
478    ComponentExecCtx {
479        tenant: tenant_ctx,
480        flow_id: format!("operator/{}", request.op_id),
481        node_id: None,
482    }
483}
484
485fn resolve_attachments(
486    payload: &OperatorPayload,
487    runtime: &TenantRuntime,
488) -> Result<Map<String, Value>, OperatorResponse> {
489    let mut attachments = Map::new();
490    for attachment in &payload.attachments {
491        if let Some(kind) = AttachmentKind::from_metadata(attachment.metadata.as_ref()) {
492            match kind {
493                AttachmentKind::Secret { key, alias } => {
494                    let secret = runtime.get_secret(&key).map_err(|err| {
495                        OperatorResponse::error(
496                            OperatorErrorCode::PolicyDenied,
497                            format!("secret `{key}` access denied: {err}"),
498                        )
499                    })?;
500                    attachments.insert(alias, Value::String(secret));
501                }
502            }
503        }
504    }
505    Ok(attachments)
506}
507
508fn merge_input_with_attachments(input: Value, attachments: Map<String, Value>) -> Value {
509    if attachments.is_empty() {
510        return input;
511    }
512    match input {
513        Value::Object(mut map) => {
514            map.insert("_attachments".into(), Value::Object(attachments));
515            Value::Object(map)
516        }
517        other => {
518            let mut map = Map::new();
519            map.insert("input".into(), other);
520            map.insert("_attachments".into(), Value::Object(attachments));
521            Value::Object(map)
522        }
523    }
524}
525
526enum AttachmentKind {
527    Secret { key: String, alias: String },
528}
529
530impl AttachmentKind {
531    fn from_metadata(metadata: Option<&Value>) -> Option<Self> {
532        let metadata = metadata?.as_object()?;
533        let attachment_type = metadata.get("type")?.as_str()?;
534        match attachment_type {
535            "secret" => {
536                let key = metadata.get("key")?.as_str()?.to_string();
537                let alias = metadata
538                    .get("alias")
539                    .and_then(Value::as_str)
540                    .map(|value| value.to_string())
541                    .unwrap_or_else(|| key.clone());
542                Some(AttachmentKind::Secret { key, alias })
543            }
544            _ => None,
545        }
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552    use serde_json::{Map, Value, json};
553
554    #[test]
555    fn merge_input_with_attachments_preserves_map_fields() {
556        let mut attachments = Map::new();
557        attachments.insert("secret".into(), json!("value"));
558        let mut input_map = Map::new();
559        input_map.insert("foo".into(), json!("bar"));
560        let merged = merge_input_with_attachments(Value::Object(input_map), attachments.clone());
561        let obj = merged.as_object().expect("should be object");
562        assert_eq!(obj.get("foo"), Some(&json!("bar")));
563        assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
564    }
565
566    #[test]
567    fn merge_input_with_attachments_wraps_scalar() {
568        let mut attachments = Map::new();
569        attachments.insert("secret".into(), json!("value"));
570        let merged =
571            merge_input_with_attachments(Value::String("text".into()), attachments.clone());
572        let obj = merged.as_object().expect("should be object");
573        assert_eq!(obj.get("input"), Some(&Value::String("text".into())));
574        assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
575    }
576
577    #[test]
578    fn attachment_kind_secret_requires_type_lock() {
579        let metadata = json!({
580            "type": "secret",
581            "key": "TOKEN"
582        });
583        if let Some(AttachmentKind::Secret { key, alias }) =
584            AttachmentKind::from_metadata(Some(&metadata))
585        {
586            assert_eq!(key, "TOKEN");
587            assert_eq!(alias, "TOKEN");
588        } else {
589            panic!("expected secret attachment");
590        }
591    }
592
593    #[test]
594    fn attachment_kind_secret_with_alias() {
595        let metadata = json!({
596            "type": "secret",
597            "key": "TOKEN",
598            "alias": "api_token"
599        });
600        if let Some(AttachmentKind::Secret { key, alias }) =
601            AttachmentKind::from_metadata(Some(&metadata))
602        {
603            assert_eq!(key, "TOKEN");
604            assert_eq!(alias, "api_token");
605        } else {
606            panic!("expected secret attachment");
607        }
608    }
609}