Skip to main content

greentic_runner_host/runner/
operator.rs

1use axum::{
2    body::{Body, to_bytes},
3    http::{HeaderMap, Response, StatusCode},
4};
5use serde::{Deserialize, Serialize};
6use serde_cbor;
7use serde_json::{Map, Value, json};
8use sha2::{Digest, Sha256};
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use tracing::{Level, span};
14
15use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
16use crate::operator_registry::{OperatorBinding, OperatorResolveError};
17use crate::provider::ProviderBinding;
18use crate::routing::TenantRuntimeHandle;
19use crate::runner::contract_cache::ContractSnapshot;
20use crate::runner::contract_introspection::introspect_component_contract;
21use crate::runner::i18n::{I18nText, resolve_text, select_locale};
22use crate::runner::schema_validator::validate_json_instance;
23use crate::runtime::TenantRuntime;
24
25const CONTENT_TYPE_CBOR: &str = "application/cbor";
26const FLAG_SKIP_OUTPUT_VALIDATE: &str = "skip-output-validate";
27const FLAG_PERMISSIVE_SCHEMA: &str = "permissive-schema";
28
29/// Operator-facing invocation payload (CBOR envelope).
30#[derive(Debug, Deserialize)]
31pub struct OperatorRequest {
32    #[serde(default)]
33    pub tenant_id: Option<String>,
34    #[serde(default)]
35    pub provider_id: Option<String>,
36    #[serde(default)]
37    pub provider_type: Option<String>,
38    #[serde(default)]
39    pub pack_id: Option<String>,
40    pub op_id: String,
41    #[serde(default)]
42    pub trace_id: Option<String>,
43    #[serde(default)]
44    pub correlation_id: Option<String>,
45    #[serde(default)]
46    pub timeout: Option<u64>,
47    #[serde(default)]
48    pub flags: Vec<String>,
49    #[serde(default)]
50    pub op_version: Option<String>,
51    #[serde(default)]
52    pub schema_hash: Option<String>,
53    #[serde(default)]
54    pub locale: Option<String>,
55    pub payload: OperatorPayload,
56}
57
58impl OperatorRequest {
59    pub fn from_cbor(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
60        serde_cbor::from_slice(bytes)
61    }
62}
63
64#[derive(Debug, Deserialize)]
65pub struct OperatorPayload {
66    #[serde(default)]
67    #[serde(rename = "cbor_input")]
68    pub cbor_input: Vec<u8>,
69    #[serde(default)]
70    pub attachments: Vec<AttachmentRef>,
71}
72
73#[derive(Debug, Deserialize)]
74pub struct AttachmentRef {
75    pub id: String,
76    #[serde(default)]
77    pub metadata: Option<Value>,
78}
79
80/// Operator response envelope serialized back to CBOR.
81#[derive(Debug, Serialize)]
82pub struct OperatorResponse {
83    pub status: OperatorStatus,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub cbor_output: Option<Vec<u8>>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub error: Option<OperatorError>,
88}
89
90impl OperatorResponse {
91    pub fn ok(output: Vec<u8>) -> Self {
92        Self {
93            status: OperatorStatus::Ok,
94            cbor_output: Some(output),
95            error: None,
96        }
97    }
98
99    pub fn error(code: OperatorErrorCode, message: impl Into<String>) -> Self {
100        Self {
101            status: OperatorStatus::Error,
102            cbor_output: None,
103            error: Some(OperatorError {
104                code,
105                message: message.into(),
106                details_cbor: None,
107            }),
108        }
109    }
110
111    pub fn error_with_diagnostics(
112        code: OperatorErrorCode,
113        message: impl Into<String>,
114        diagnostics: Vec<Diagnostic>,
115    ) -> Self {
116        let details_cbor = serde_cbor::to_vec(&diagnostics).ok();
117        Self {
118            status: OperatorStatus::Error,
119            cbor_output: None,
120            error: Some(OperatorError {
121                code,
122                message: message.into(),
123                details_cbor,
124            }),
125        }
126    }
127
128    pub fn to_cbor(&self) -> Result<Vec<u8>, serde_cbor::Error> {
129        serde_cbor::ser::to_vec_packed(self)
130    }
131}
132
133#[derive(Debug, Serialize)]
134pub struct OperatorError {
135    pub code: OperatorErrorCode,
136    pub message: String,
137    #[serde(skip_serializing_if = "Option::is_none")]
138    pub details_cbor: Option<Vec<u8>>,
139}
140
141#[derive(Debug, Serialize)]
142pub enum OperatorStatus {
143    Ok,
144    Error,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum DiagnosticSeverity {
150    Error,
151    Warning,
152    Info,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
156pub struct Diagnostic {
157    pub code: String,
158    pub path: String,
159    pub severity: DiagnosticSeverity,
160    pub message_key: String,
161    pub fallback: String,
162    pub message: String,
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub hint: Option<String>,
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub component_id: Option<String>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub digest: Option<String>,
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub operation_id: Option<String>,
171}
172
173#[derive(Debug, Clone, Copy, Serialize)]
174#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
175pub enum OperatorErrorCode {
176    OpNotFound,
177    ProviderNotFound,
178    TenantNotAllowed,
179    InvalidRequest,
180    CborDecode,
181    TypeMismatch,
182    ComponentLoad,
183    InvokeTrap,
184    Timeout,
185    PolicyDenied,
186    HostFailure,
187}
188
189#[derive(Debug, Copy, Clone, PartialEq, Eq)]
190struct ExecutionValidationOptions {
191    validate_output: bool,
192    strict: bool,
193}
194
195impl Default for ExecutionValidationOptions {
196    fn default() -> Self {
197        Self {
198            validate_output: true,
199            strict: true,
200        }
201    }
202}
203
204fn validation_options_from_flags(flags: &[String]) -> ExecutionValidationOptions {
205    let mut options = ExecutionValidationOptions::default();
206    for flag in flags {
207        match flag.trim().to_ascii_lowercase().as_str() {
208            FLAG_SKIP_OUTPUT_VALIDATE => options.validate_output = false,
209            FLAG_PERMISSIVE_SCHEMA => options.strict = false,
210            _ => {}
211        }
212    }
213    options
214}
215
216fn normalize_operation_id(op_id: &str) -> String {
217    let normalized = op_id.trim();
218    if normalized.is_empty() {
219        "run".to_string()
220    } else {
221        normalized.to_string()
222    }
223}
224
225fn normalize_sha256_hash(value: &str) -> String {
226    let trimmed = value.trim();
227    if trimmed.starts_with("sha256:") {
228        trimmed.to_string()
229    } else {
230        format!("sha256:{trimmed}")
231    }
232}
233
234fn sha256_prefixed(bytes: &[u8]) -> String {
235    let mut hasher = Sha256::new();
236    hasher.update(bytes);
237    let digest = hasher.finalize();
238    format!("sha256:{}", to_hex(&digest))
239}
240
241fn to_hex(digest: &[u8]) -> String {
242    digest.iter().map(|byte| format!("{byte:02x}")).collect()
243}
244
245#[derive(Serialize)]
246struct SchemaHashMaterial<'a> {
247    resolved_digest: &'a str,
248    component_ref: &'a str,
249    operation_id: &'a str,
250    world: &'a str,
251    export: &'a str,
252    input_schema: &'a Value,
253    output_schema: &'a Value,
254    config_schema: &'a Value,
255    state_schema_ref: Option<&'a str>,
256}
257
258#[derive(Serialize)]
259struct DescribeHashMaterial<'a> {
260    resolved_digest: &'a str,
261    component_ref: &'a str,
262    world: &'a str,
263    export: &'a str,
264    pack_ref: &'a str,
265    input_schema: &'a Value,
266    output_schema: &'a Value,
267}
268
269#[allow(clippy::too_many_arguments)]
270fn compute_contract_hashes(
271    resolved_digest: &str,
272    component_ref: &str,
273    operation_id: &str,
274    world: &str,
275    export: &str,
276    input_schema: &Value,
277    output_schema: &Value,
278    config_schema: &Value,
279    state_schema_ref: Option<&str>,
280    pack_ref: &str,
281) -> (String, String) {
282    let input_schema = canonicalize_json_value(input_schema.clone());
283    let output_schema = canonicalize_json_value(output_schema.clone());
284    let config_schema = canonicalize_json_value(config_schema.clone());
285    let describe_material = DescribeHashMaterial {
286        resolved_digest,
287        component_ref,
288        world,
289        export,
290        pack_ref,
291        input_schema: &input_schema,
292        output_schema: &output_schema,
293    };
294    let describe_bytes =
295        serde_cbor::to_vec(&describe_material).expect("describe hash material serialization");
296    let describe_hash = sha256_prefixed(&describe_bytes);
297
298    let schema_material = SchemaHashMaterial {
299        resolved_digest,
300        component_ref,
301        operation_id,
302        world,
303        export,
304        input_schema: &input_schema,
305        output_schema: &output_schema,
306        config_schema: &config_schema,
307        state_schema_ref,
308    };
309    let schema_bytes =
310        serde_cbor::to_vec(&schema_material).expect("schema hash material serialization");
311    let schema_hash = sha256_prefixed(&schema_bytes);
312    (describe_hash, schema_hash)
313}
314
315fn canonicalize_json_value(value: Value) -> Value {
316    match value {
317        Value::Object(map) => {
318            let mut ordered = serde_json::Map::new();
319            let mut keys = map.keys().cloned().collect::<Vec<_>>();
320            keys.sort();
321            for key in keys {
322                let normalized = map
323                    .get(&key)
324                    .cloned()
325                    .map(canonicalize_json_value)
326                    .unwrap_or(Value::Null);
327                ordered.insert(key, normalized);
328            }
329            Value::Object(ordered)
330        }
331        Value::Array(items) => {
332            Value::Array(items.into_iter().map(canonicalize_json_value).collect())
333        }
334        other => other,
335    }
336}
337
338fn derive_output_schema_ref(config_schema_ref: Option<&str>) -> Option<String> {
339    let config_ref = config_schema_ref?;
340    let candidate = config_ref.replace("config", "output");
341    if candidate == config_ref {
342        None
343    } else {
344        Some(candidate)
345    }
346}
347
348fn schema_issues_to_diagnostics(
349    issues: Vec<crate::runner::schema_validator::SchemaValidationIssue>,
350    path_prefix: &str,
351    component_ref: &str,
352    resolved_digest: &str,
353    op_id: &str,
354    locale: &str,
355) -> Vec<Diagnostic> {
356    issues
357        .into_iter()
358        .map(|issue| {
359            let text = I18nText::new(issue.message_key, issue.fallback);
360            let path = if path_prefix.is_empty() {
361                issue.path
362            } else if issue.path == "/" {
363                path_prefix.to_string()
364            } else {
365                format!("{path_prefix}{}", issue.path)
366            };
367            Diagnostic {
368                code: issue.code,
369                path,
370                severity: DiagnosticSeverity::Error,
371                message_key: text.message_key.clone(),
372                fallback: text.fallback.clone(),
373                message: resolve_text(&text, locale),
374                hint: None,
375                component_id: Some(component_ref.to_string()),
376                digest: Some(resolved_digest.to_string()),
377                operation_id: Some(op_id.to_string()),
378            }
379        })
380        .collect()
381}
382
383#[allow(clippy::too_many_arguments)]
384fn diagnostic_error(
385    code: &str,
386    path: &str,
387    message_key: &str,
388    fallback: String,
389    operation_id: Option<&str>,
390    component_id: Option<&str>,
391    digest: Option<&str>,
392    locale: &str,
393) -> Diagnostic {
394    let text = I18nText::new(message_key, fallback);
395    let message = resolve_text(&text, locale);
396    Diagnostic {
397        code: code.to_string(),
398        path: path.to_string(),
399        severity: DiagnosticSeverity::Error,
400        message_key: text.message_key,
401        message,
402        fallback: text.fallback,
403        hint: None,
404        component_id: component_id.map(ToString::to_string),
405        digest: digest.map(ToString::to_string),
406        operation_id: operation_id.map(ToString::to_string),
407    }
408}
409
410impl OperatorErrorCode {
411    pub fn reason(&self) -> &'static str {
412        match self {
413            OperatorErrorCode::OpNotFound => "op not found",
414            OperatorErrorCode::ProviderNotFound => "provider not found",
415            OperatorErrorCode::TenantNotAllowed => "tenant not allowed",
416            OperatorErrorCode::InvalidRequest => "invalid operator request",
417            OperatorErrorCode::CborDecode => "failed to decode CBOR payload",
418            OperatorErrorCode::TypeMismatch => "type mismatch between CBOR and operation",
419            OperatorErrorCode::ComponentLoad => "failed to load component",
420            OperatorErrorCode::InvokeTrap => "component trapped during invoke",
421            OperatorErrorCode::Timeout => "invocation timed out",
422            OperatorErrorCode::PolicyDenied => "policy denied the operation",
423            OperatorErrorCode::HostFailure => "internal host failure",
424        }
425    }
426}
427
428/// Invoke an operator request without assuming HTTP transport.
429pub async fn invoke_operator(
430    runtime: &TenantRuntime,
431    request: OperatorRequest,
432) -> OperatorResponse {
433    let op_id = normalize_operation_id(&request.op_id);
434    let validation_options = validation_options_from_flags(&request.flags);
435    let locale = select_locale(request.locale.as_deref());
436    if let Some(request_tenant) = request.tenant_id.as_deref()
437        && request_tenant != runtime.tenant()
438    {
439        let message = format!(
440            "tenant mismatch: routing resolved `{}` but request wants `{request_tenant}`",
441            runtime.tenant(),
442        );
443        return OperatorResponse::error_with_diagnostics(
444            OperatorErrorCode::TenantNotAllowed,
445            message.clone(),
446            vec![diagnostic_error(
447                "tenant_mismatch",
448                "/tenant_id",
449                "runner.operator.tenant_mismatch",
450                message,
451                Some(op_id.as_str()),
452                None,
453                runtime.digest(),
454                &locale,
455            )],
456        );
457    }
458
459    if request.provider_id.is_none() && request.provider_type.is_none() {
460        let message = "operator invoke requires provider_id or provider_type".to_string();
461        return OperatorResponse::error_with_diagnostics(
462            OperatorErrorCode::InvalidRequest,
463            message.clone(),
464            vec![diagnostic_error(
465                "missing_provider_selector",
466                "/provider_id",
467                "runner.operator.missing_provider_selector",
468                message,
469                Some(op_id.as_str()),
470                None,
471                runtime.digest(),
472                &locale,
473            )],
474        );
475    }
476
477    let tenant = runtime.tenant();
478    let root_span = span!(
479        Level::INFO,
480        "operator.invoke",
481        tenant = %tenant,
482        op_id = %op_id,
483        provider_id = ?request.provider_id,
484        provider_type = ?request.provider_type
485    );
486    let _root_guard = root_span.enter();
487
488    let provider_id = request.provider_id.as_deref();
489    let provider_type = request.provider_type.as_deref();
490    runtime
491        .operator_metrics()
492        .resolve_attempts
493        .fetch_add(1, Ordering::Relaxed);
494    let resolve_span = span!(Level::DEBUG, "resolve_op");
495    let _resolve_guard = resolve_span.enter();
496
497    let emit_resolve_error = |err: OperatorResolveError| {
498        let (code, message) = match err {
499            OperatorResolveError::ProviderNotFound => {
500                let label = provider_id.or(provider_type).unwrap_or("unknown");
501                (
502                    OperatorErrorCode::ProviderNotFound,
503                    format!("provider `{label}` not registered"),
504                )
505            }
506            OperatorResolveError::OpNotFound => {
507                let label = provider_id.or(provider_type).unwrap_or("unknown provider");
508                (
509                    OperatorErrorCode::OpNotFound,
510                    format!("op `{}` not found for provider `{label}`", &op_id),
511                )
512            }
513        };
514        runtime
515            .operator_metrics()
516            .resolve_errors
517            .fetch_add(1, Ordering::Relaxed);
518        let response = OperatorResponse::error(code, message);
519        let diagnostic = diagnostic_error(
520            match code {
521                OperatorErrorCode::ProviderNotFound => "provider_not_found",
522                OperatorErrorCode::OpNotFound => "op_not_found",
523                _ => "resolve_error",
524            },
525            "/op_id",
526            match code {
527                OperatorErrorCode::ProviderNotFound => "runner.operator.provider_not_found",
528                OperatorErrorCode::OpNotFound => "runner.operator.op_not_found",
529                _ => "runner.operator.resolve_error",
530            },
531            response
532                .error
533                .as_ref()
534                .map(|e| e.message.clone())
535                .unwrap_or_else(|| "operator resolve failed".to_string()),
536            Some(op_id.as_str()),
537            binding_component_ref_hint(provider_id, provider_type),
538            runtime.digest(),
539            &locale,
540        );
541        OperatorResponse::error_with_diagnostics(
542            code,
543            response
544                .error
545                .as_ref()
546                .map(|e| e.message.clone())
547                .unwrap_or_else(|| "operator resolve failed".to_string()),
548            vec![diagnostic],
549        )
550    };
551
552    // M1.1b: state-store-backed provider instances are not in
553    // OperatorRegistry's per_provider_id index (the registry is built from
554    // inline pack decls at load time, instances live in the tenant state
555    // store at providers/instances/{id}.json). For id-only dispatch, probe
556    // the state store via main_pack to derive the provider_type, then re-key
557    // the operator lookup by type. The probe's ProviderBinding is carried
558    // forward so the second-stage pack.resolve_provider can be skipped — one
559    // state-store read covers both the type-derivation and the runtime
560    // binding. This also makes the snapshot atomic (no probe-vs-dispatch
561    // TOCTOU on the instance file).
562    let initial = runtime
563        .operator_registry()
564        .resolve(provider_id, provider_type, &op_id);
565    let (binding, probe_binding): (OperatorBinding, Option<ProviderBinding>) = match initial {
566        Ok(b) => (b.clone(), None),
567        Err(OperatorResolveError::ProviderNotFound)
568            if let Some(id) = provider_id
569                && provider_type.is_none() =>
570        {
571            match runtime.main_pack().resolve_provider(Some(id), None) {
572                Ok(pb) => {
573                    let derived = pb.provider_type.clone();
574                    // Retry by type only: the probe proved this id maps to
575                    // that type. The strict OperatorRegistry contract would
576                    // reject (Some(id), _) again because the id is not in
577                    // per_provider_id.
578                    match runtime
579                        .operator_registry()
580                        .resolve(None, Some(derived.as_str()), &op_id)
581                    {
582                        Ok(b) => (b.clone(), Some(pb)),
583                        Err(err) => return emit_resolve_error(err),
584                    }
585                }
586                Err(probe_err) => {
587                    // Don't swallow — disabled instance, malformed JSON, or
588                    // state-store I/O errors are all actionable diagnostics
589                    // that get rolled up as a generic ProviderNotFound to
590                    // the caller but surface in logs for operators.
591                    tracing::warn!(
592                        provider_id = %id,
593                        error = %probe_err,
594                        "state-store probe failed for id-only operator dispatch"
595                    );
596                    return emit_resolve_error(OperatorResolveError::ProviderNotFound);
597                }
598            }
599        }
600        Err(err) => return emit_resolve_error(err),
601    };
602    drop(_resolve_guard);
603
604    let policy = &runtime.config().operator_policy;
605    if !policy.allows_provider(provider_id, binding.provider_type.as_str()) {
606        return OperatorResponse::error(
607            OperatorErrorCode::PolicyDenied,
608            format!(
609                "provider `{}` not allowed for tenant {}",
610                binding
611                    .provider_id
612                    .as_deref()
613                    .unwrap_or(&binding.provider_type),
614                runtime.config().tenant
615            ),
616        );
617    }
618    if !policy.allows_op(provider_id, binding.provider_type.as_str(), &binding.op_id) {
619        return OperatorResponse::error(
620            OperatorErrorCode::PolicyDenied,
621            format!(
622                "op `{}` is not permitted for provider `{}` on tenant {}",
623                binding.op_id,
624                binding
625                    .provider_id
626                    .as_deref()
627                    .unwrap_or(&binding.provider_type),
628                runtime.config().tenant
629            ),
630        );
631    }
632
633    if let Some(req_pack) = request.pack_id.as_deref() {
634        let binding_pack = binding
635            .pack_ref
636            .split('@')
637            .next()
638            .unwrap_or(&binding.pack_ref);
639        if binding_pack != req_pack {
640            return OperatorResponse::error(
641                OperatorErrorCode::PolicyDenied,
642                format!(
643                    "request bound to pack `{req_pack}`, but op lives in `{}`",
644                    binding.pack_ref
645                ),
646            );
647        }
648    }
649
650    let attachments = match resolve_attachments(&request.payload, runtime) {
651        Ok(map) => map,
652        Err(response) => return response,
653    };
654
655    let decode_span = span!(Level::DEBUG, "decode_cbor");
656    let _decode_guard = decode_span.enter();
657    let input_value = match decode_request_payload(&request.payload.cbor_input) {
658        Ok(value) => value,
659        Err(err) => {
660            runtime
661                .operator_metrics()
662                .cbor_decode_errors
663                .fetch_add(1, Ordering::Relaxed);
664            return OperatorResponse::error(OperatorErrorCode::CborDecode, format!("{err}"));
665        }
666    };
667    drop(_decode_guard);
668
669    let input_value = merge_input_with_attachments(input_value, attachments);
670
671    let registry_component_ref = &binding.runtime.component_ref;
672    let resolved = match runtime.resolve_component(registry_component_ref) {
673        Some(resolved) => resolved,
674        None => {
675            return OperatorResponse::error(
676                OperatorErrorCode::ComponentLoad,
677                format!(
678                    "component `{}` not found in tenant packs",
679                    registry_component_ref
680                ),
681            );
682        }
683    };
684    let pack = resolved.pack;
685    // When the probe above produced a ProviderBinding (id-only dispatch
686    // against a state-store instance), reuse it — it's already a coherent
687    // snapshot from the same load_instance call that derived the type, so
688    // there's nothing to drift against and no second state-store read is
689    // needed. For all other paths the second-stage resolve runs as before
690    // and ProviderRegistry's defense-in-depth check (binding.provider_type
691    // == requested) fires on the (Some(id), Some(type)) shape.
692    let provider_binding = match probe_binding {
693        Some(pb) => pb,
694        None => match pack.resolve_provider(provider_id, provider_type) {
695            Ok(binding) => binding,
696            Err(err) => {
697                return OperatorResponse::error(
698                    OperatorErrorCode::HostFailure,
699                    format!("failed to resolve provider runtime: {err}"),
700                );
701            }
702        },
703    };
704
705    let component_ref = &provider_binding.component_ref;
706    let resolved = match runtime.resolve_component(component_ref) {
707        Some(resolved) => resolved,
708        None => {
709            return OperatorResponse::error(
710                OperatorErrorCode::ComponentLoad,
711                format!("component `{}` not found in tenant packs", component_ref),
712            );
713        }
714    };
715    let pack = resolved.pack;
716    let resolved_digest = if resolved.digest == "unknown" {
717        binding
718            .pack_digest
719            .clone()
720            .unwrap_or_else(|| resolved.digest.clone())
721    } else {
722        resolved.digest.clone()
723    };
724    let introspected_contract =
725        match introspect_component_contract(pack.as_ref(), component_ref.as_str(), &op_id) {
726            Ok(value) => value,
727            Err(err) => {
728                let message = format!("failed to introspect component contract: {err}");
729                return OperatorResponse::error_with_diagnostics(
730                    OperatorErrorCode::TypeMismatch,
731                    message.clone(),
732                    vec![diagnostic_error(
733                        "contract_introspection_failed",
734                        "/operation",
735                        "runner.operator.contract_introspection_failed",
736                        message,
737                        Some(op_id.as_str()),
738                        Some(component_ref.as_str()),
739                        Some(resolved_digest.as_str()),
740                        &locale,
741                    )],
742                );
743            }
744        };
745    let invoke_op_id = introspected_contract
746        .as_ref()
747        .map(|contract| contract.selected_operation.clone())
748        .unwrap_or_else(|| op_id.clone());
749    let loaded_config_schema = introspected_contract
750        .as_ref()
751        .map(|contract| contract.config_schema.clone())
752        .filter(|value| !value.is_null())
753        .or_else(|| {
754            binding
755                .config_schema_ref
756                .as_deref()
757                .and_then(|schema_ref| pack.load_schema_json(schema_ref).ok().flatten())
758        })
759        .unwrap_or(Value::Null);
760    let loaded_output_schema = introspected_contract
761        .as_ref()
762        .map(|contract| contract.output_schema.clone())
763        .filter(|value| !value.is_null())
764        .or_else(|| {
765            derive_output_schema_ref(binding.config_schema_ref.as_deref())
766                .and_then(|schema_ref| pack.load_schema_json(&schema_ref).ok().flatten())
767        })
768        .unwrap_or(Value::Null);
769    let loaded_input_schema = introspected_contract
770        .as_ref()
771        .map(|contract| contract.input_schema.clone())
772        .filter(|value| !value.is_null())
773        .or_else(|| loaded_config_schema.is_null().then_some(Value::Null))
774        .or_else(|| Some(loaded_config_schema.clone()))
775        .unwrap_or(Value::Null);
776    let loaded_config_schema = binding
777        .config_schema_ref
778        .as_deref()
779        .and_then(|schema_ref| pack.load_schema_json(schema_ref).ok().flatten())
780        .unwrap_or_else(|| loaded_config_schema.clone());
781    let contract_key = format!(
782        "{}::{component_ref}::{op_id}::validate_output={}::strict={}",
783        resolved_digest, validation_options.validate_output, validation_options.strict
784    );
785    let _contract_snapshot = if let Some(snapshot) = runtime.contract_cache().get(&contract_key) {
786        snapshot
787    } else {
788        let (describe_hash, schema_hash) = introspected_contract
789            .as_ref()
790            .map(|contract| (contract.describe_hash.clone(), contract.schema_hash.clone()))
791            .unwrap_or_else(|| {
792                compute_contract_hashes(
793                    &resolved_digest,
794                    component_ref,
795                    &invoke_op_id,
796                    &provider_binding.world,
797                    &provider_binding.export,
798                    &loaded_input_schema,
799                    &loaded_output_schema,
800                    &loaded_config_schema,
801                    binding.state_schema_ref.as_deref(),
802                    &binding.pack_ref,
803                )
804            });
805        let mut snapshot = ContractSnapshot::new(
806            resolved_digest.clone(),
807            component_ref.clone(),
808            invoke_op_id.clone(),
809            validation_options.validate_output,
810            validation_options.strict,
811        );
812        snapshot.describe_hash = Some(describe_hash);
813        snapshot.schema_hash = Some(schema_hash);
814        let snapshot = Arc::new(snapshot);
815        runtime
816            .contract_cache()
817            .insert(contract_key, Arc::clone(&snapshot));
818        snapshot
819    };
820    if !loaded_input_schema.is_null() {
821        let issues = validate_json_instance(
822            &loaded_input_schema,
823            &input_value,
824            validation_options.strict,
825        );
826        if !issues.is_empty() {
827            let diagnostics = schema_issues_to_diagnostics(
828                issues,
829                "/input",
830                component_ref,
831                &resolved_digest,
832                &op_id,
833                &locale,
834            );
835            return OperatorResponse::error_with_diagnostics(
836                OperatorErrorCode::TypeMismatch,
837                "input failed schema validation".to_string(),
838                diagnostics,
839            );
840        }
841    } else if validation_options.strict && binding.config_schema_ref.is_some() {
842        let message = format!(
843            "schema `{}` referenced by op `{}` was not found in pack",
844            binding.config_schema_ref.as_deref().unwrap_or("unknown"),
845            op_id
846        );
847        return OperatorResponse::error_with_diagnostics(
848            OperatorErrorCode::TypeMismatch,
849            message.clone(),
850            vec![diagnostic_error(
851                "schema_ref_not_found",
852                "/schema_hash",
853                "runner.operator.schema_ref_not_found",
854                message,
855                Some(op_id.as_str()),
856                Some(component_ref.as_str()),
857                Some(resolved_digest.as_str()),
858                &locale,
859            )],
860        );
861    }
862
863    if let Some(request_schema_hash) = request.schema_hash.as_deref()
864        && let Some(expected_schema_hash) = _contract_snapshot.schema_hash.as_deref()
865    {
866        let expected = normalize_sha256_hash(expected_schema_hash);
867        let provided = normalize_sha256_hash(request_schema_hash);
868        if expected != provided {
869            let message = format!(
870                "schema_hash mismatch for op `{}`: expected `{}`, got `{}`",
871                op_id, expected, provided
872            );
873            return OperatorResponse::error_with_diagnostics(
874                OperatorErrorCode::TypeMismatch,
875                message.clone(),
876                vec![diagnostic_error(
877                    "schema_hash_mismatch",
878                    "/schema_hash",
879                    "runner.operator.schema_hash_mismatch",
880                    message,
881                    Some(op_id.as_str()),
882                    Some(component_ref.as_str()),
883                    Some(resolved_digest.as_str()),
884                    &locale,
885                )],
886            );
887        }
888    }
889
890    let input_json = match serde_json::to_string(&input_value) {
891        Ok(json) => json,
892        Err(err) => {
893            return OperatorResponse::error(
894                OperatorErrorCode::TypeMismatch,
895                format!("failed to serialise input JSON: {err}"),
896            );
897        }
898    };
899
900    let exec_ctx = build_exec_ctx(&request, runtime, &op_id);
901    runtime
902        .operator_metrics()
903        .invoke_attempts
904        .fetch_add(1, Ordering::Relaxed);
905    let invoke_span = span!(Level::INFO, "invoke_component", component = %component_ref);
906    let _invoke_guard = invoke_span.enter();
907    let result = if provider_binding.world.starts_with("greentic:provider-core") {
908        let input_bytes = input_json.clone().into_bytes();
909        match pack
910            .invoke_provider(&provider_binding, exec_ctx, &invoke_op_id, input_bytes)
911            .await
912        {
913            Ok(value) => value,
914            Err(err) => {
915                runtime
916                    .operator_metrics()
917                    .invoke_errors
918                    .fetch_add(1, Ordering::Relaxed);
919                return OperatorResponse::error(
920                    OperatorErrorCode::HostFailure,
921                    format!("provider invoke failed: {err}"),
922                );
923            }
924        }
925    } else {
926        match pack
927            .invoke_component(
928                component_ref,
929                exec_ctx,
930                &invoke_op_id,
931                provider_binding.config_json.clone(),
932                input_json.clone(),
933            )
934            .await
935        {
936            Ok(value) => value,
937            Err(err) => {
938                runtime
939                    .operator_metrics()
940                    .invoke_errors
941                    .fetch_add(1, Ordering::Relaxed);
942                return OperatorResponse::error(
943                    OperatorErrorCode::HostFailure,
944                    format!("component invoke failed: {err}"),
945                );
946            }
947        }
948    };
949    drop(_invoke_guard);
950
951    if validation_options.validate_output
952        && let Some(output_ref) = derive_output_schema_ref(binding.config_schema_ref.as_deref())
953        && let Ok(Some(output_schema)) = pack.load_schema_json(&output_ref)
954    {
955        let output_value = result
956            .as_object()
957            .and_then(|obj| obj.get("output"))
958            .unwrap_or(&result);
959        let issues =
960            validate_json_instance(&output_schema, output_value, validation_options.strict);
961        if !issues.is_empty() {
962            let diagnostics = schema_issues_to_diagnostics(
963                issues,
964                "/output",
965                component_ref,
966                &resolved_digest,
967                &op_id,
968                &locale,
969            );
970            return OperatorResponse::error_with_diagnostics(
971                OperatorErrorCode::TypeMismatch,
972                "output failed schema validation".to_string(),
973                diagnostics,
974            );
975        }
976    }
977
978    if let Some(new_state) = result.as_object().and_then(|obj| obj.get("new_state")) {
979        if let Some(config_ref) = binding.config_schema_ref.as_deref() {
980            let config_schema = match pack.load_schema_json(config_ref) {
981                Ok(Some(schema)) => schema,
982                Ok(None) => {
983                    let message = format!(
984                        "config schema `{}` required for new_state validation was not found",
985                        config_ref
986                    );
987                    return OperatorResponse::error_with_diagnostics(
988                        OperatorErrorCode::TypeMismatch,
989                        message.clone(),
990                        vec![diagnostic_error(
991                            "new_state_schema_missing",
992                            "/new_state",
993                            "runner.operator.new_state_schema_missing",
994                            message,
995                            Some(op_id.as_str()),
996                            Some(component_ref.as_str()),
997                            Some(resolved_digest.as_str()),
998                            &locale,
999                        )],
1000                    );
1001                }
1002                Err(err) => {
1003                    let message = format!(
1004                        "failed to load config schema `{}` for new_state validation: {}",
1005                        config_ref, err
1006                    );
1007                    return OperatorResponse::error_with_diagnostics(
1008                        OperatorErrorCode::TypeMismatch,
1009                        message.clone(),
1010                        vec![diagnostic_error(
1011                            "new_state_schema_load_failed",
1012                            "/new_state",
1013                            "runner.operator.new_state_schema_load_failed",
1014                            message,
1015                            Some(op_id.as_str()),
1016                            Some(component_ref.as_str()),
1017                            Some(resolved_digest.as_str()),
1018                            &locale,
1019                        )],
1020                    );
1021                }
1022            };
1023            let issues =
1024                validate_json_instance(&config_schema, new_state, validation_options.strict);
1025            if !issues.is_empty() {
1026                let diagnostics = schema_issues_to_diagnostics(
1027                    issues,
1028                    "/new_state",
1029                    component_ref,
1030                    &resolved_digest,
1031                    &op_id,
1032                    &locale,
1033                );
1034                return OperatorResponse::error_with_diagnostics(
1035                    OperatorErrorCode::TypeMismatch,
1036                    "new_state failed schema validation".to_string(),
1037                    diagnostics,
1038                );
1039            }
1040        } else if validation_options.strict {
1041            let message = "new_state returned but no config_schema_ref is available".to_string();
1042            return OperatorResponse::error_with_diagnostics(
1043                OperatorErrorCode::TypeMismatch,
1044                message.clone(),
1045                vec![diagnostic_error(
1046                    "new_state_schema_unavailable",
1047                    "/new_state",
1048                    "runner.operator.new_state_schema_unavailable",
1049                    message,
1050                    Some(op_id.as_str()),
1051                    Some(component_ref.as_str()),
1052                    Some(resolved_digest.as_str()),
1053                    &locale,
1054                )],
1055            );
1056        }
1057    }
1058
1059    let encode_span = span!(Level::DEBUG, "encode_cbor");
1060    let _encode_guard = encode_span.enter();
1061    let output_bytes = match serde_cbor::to_vec(&result) {
1062        Ok(bytes) => bytes,
1063        Err(err) => {
1064            return OperatorResponse::error(
1065                OperatorErrorCode::HostFailure,
1066                format!("failed to encode CBOR output: {err}"),
1067            );
1068        }
1069    };
1070    drop(_encode_guard);
1071
1072    OperatorResponse::ok(output_bytes)
1073}
1074
1075fn binding_component_ref_hint<'a>(
1076    provider_id: Option<&'a str>,
1077    provider_type: Option<&'a str>,
1078) -> Option<&'a str> {
1079    provider_id.or(provider_type)
1080}
1081
1082/// Convenience helper that takes CBOR bytes and reuses `invoke_operator`.
1083pub async fn invoke_operator_cbor(
1084    runtime: &TenantRuntime,
1085    req_cbor: &[u8],
1086) -> Result<Vec<u8>, serde_cbor::Error> {
1087    let request = OperatorRequest::from_cbor(req_cbor)?;
1088    let response = invoke_operator(runtime, request).await;
1089    response.to_cbor()
1090}
1091
1092/// Axum handler stub for `/operator/op/invoke`.
1093pub async fn invoke(
1094    TenantRuntimeHandle { runtime, .. }: TenantRuntimeHandle,
1095    _headers: HeaderMap,
1096    body: Body,
1097) -> Result<Response<Body>, Response<Body>> {
1098    let bytes = match to_bytes(body, usize::MAX).await {
1099        Ok(bytes) => bytes,
1100        Err(err) => {
1101            return Err(bad_request(format!("failed to read body: {err}")));
1102        }
1103    };
1104
1105    let request = match OperatorRequest::from_cbor(&bytes) {
1106        Ok(request) => request,
1107        Err(err) => {
1108            return Err(bad_request(format!("failed to decode request CBOR: {err}")));
1109        }
1110    };
1111
1112    let response = invoke_operator(&runtime, request).await;
1113    build_cbor_response(response)
1114}
1115
1116fn bad_request(message: String) -> Response<Body> {
1117    let payload = json!({ "error": message });
1118    Response::builder()
1119        .status(StatusCode::BAD_REQUEST)
1120        .header("content-type", "application/json")
1121        .body(Body::from(payload.to_string()))
1122        .expect("building JSON error response must succeed")
1123}
1124
1125#[allow(clippy::result_large_err)]
1126fn build_cbor_response(response: OperatorResponse) -> Result<Response<Body>, Response<Body>> {
1127    match response.to_cbor() {
1128        Ok(bytes) => Ok(Response::builder()
1129            .status(StatusCode::OK)
1130            .header("content-type", CONTENT_TYPE_CBOR)
1131            .body(Body::from(bytes))
1132            .expect("building CBOR response must succeed")),
1133        Err(err) => Err(bad_request(format!(
1134            "failed to serialize response CBOR: {err}"
1135        ))),
1136    }
1137}
1138
1139fn decode_request_payload(bytes: &[u8]) -> Result<Value, serde_cbor::Error> {
1140    if bytes.is_empty() {
1141        return Ok(Value::Null);
1142    }
1143    serde_cbor::from_slice(bytes)
1144}
1145
1146fn build_exec_ctx(
1147    request: &OperatorRequest,
1148    runtime: &TenantRuntime,
1149    operation_id: &str,
1150) -> ComponentExecCtx {
1151    let deadline_unix_ms = request.timeout.and_then(|timeout_ms| {
1152        SystemTime::now()
1153            .checked_add(Duration::from_millis(timeout_ms))
1154            .and_then(|deadline| deadline.duration_since(UNIX_EPOCH).ok())
1155            .map(|duration| duration.as_millis() as u64)
1156    });
1157
1158    let tenant_ctx = ComponentTenantCtx {
1159        tenant: runtime.config().tenant.clone(),
1160        team: None,
1161        user: None,
1162        trace_id: request.trace_id.clone(),
1163        i18n_id: None,
1164        correlation_id: request.correlation_id.clone(),
1165        deadline_unix_ms,
1166        attempt: 1,
1167        idempotency_key: request.correlation_id.clone(),
1168    };
1169
1170    ComponentExecCtx {
1171        tenant: tenant_ctx,
1172        i18n_id: None,
1173        flow_id: format!("operator/{operation_id}"),
1174        node_id: None,
1175    }
1176}
1177
1178fn resolve_attachments(
1179    payload: &OperatorPayload,
1180    runtime: &TenantRuntime,
1181) -> Result<Map<String, Value>, OperatorResponse> {
1182    let mut attachments = Map::new();
1183    for attachment in &payload.attachments {
1184        if let Some(kind) = AttachmentKind::from_metadata(attachment.metadata.as_ref()) {
1185            match kind {
1186                AttachmentKind::Secret { key, alias } => {
1187                    let secret = runtime.get_secret(&key).map_err(|err| {
1188                        OperatorResponse::error(
1189                            OperatorErrorCode::PolicyDenied,
1190                            format!("secret `{key}` access denied: {err}"),
1191                        )
1192                    })?;
1193                    attachments.insert(alias, Value::String(secret));
1194                }
1195            }
1196        }
1197    }
1198    Ok(attachments)
1199}
1200
1201fn merge_input_with_attachments(input: Value, attachments: Map<String, Value>) -> Value {
1202    if attachments.is_empty() {
1203        return input;
1204    }
1205    match input {
1206        Value::Object(mut map) => {
1207            map.insert("_attachments".into(), Value::Object(attachments));
1208            Value::Object(map)
1209        }
1210        other => {
1211            let mut map = Map::new();
1212            map.insert("input".into(), other);
1213            map.insert("_attachments".into(), Value::Object(attachments));
1214            Value::Object(map)
1215        }
1216    }
1217}
1218
1219enum AttachmentKind {
1220    Secret { key: String, alias: String },
1221}
1222
1223impl AttachmentKind {
1224    fn from_metadata(metadata: Option<&Value>) -> Option<Self> {
1225        let metadata = metadata?.as_object()?;
1226        let attachment_type = metadata.get("type")?.as_str()?;
1227        match attachment_type {
1228            "secret" => {
1229                let key = metadata.get("key")?.as_str()?.to_string();
1230                let alias = metadata
1231                    .get("alias")
1232                    .and_then(Value::as_str)
1233                    .map(|value| value.to_string())
1234                    .unwrap_or_else(|| key.clone());
1235                Some(AttachmentKind::Secret { key, alias })
1236            }
1237            _ => None,
1238        }
1239    }
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244    use super::*;
1245    use serde_json::{Map, Value, json};
1246
1247    #[test]
1248    fn merge_input_with_attachments_preserves_map_fields() {
1249        let mut attachments = Map::new();
1250        attachments.insert("secret".into(), json!("value"));
1251        let mut input_map = Map::new();
1252        input_map.insert("foo".into(), json!("bar"));
1253        let merged = merge_input_with_attachments(Value::Object(input_map), attachments.clone());
1254        let obj = merged.as_object().expect("should be object");
1255        assert_eq!(obj.get("foo"), Some(&json!("bar")));
1256        assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
1257    }
1258
1259    #[test]
1260    fn merge_input_with_attachments_wraps_scalar() {
1261        let mut attachments = Map::new();
1262        attachments.insert("secret".into(), json!("value"));
1263        let merged =
1264            merge_input_with_attachments(Value::String("text".into()), attachments.clone());
1265        let obj = merged.as_object().expect("should be object");
1266        assert_eq!(obj.get("input"), Some(&Value::String("text".into())));
1267        assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
1268    }
1269
1270    #[test]
1271    fn attachment_kind_secret_requires_type_lock() {
1272        let metadata = json!({
1273            "type": "secret",
1274            "key": "TOKEN"
1275        });
1276        if let Some(AttachmentKind::Secret { key, alias }) =
1277            AttachmentKind::from_metadata(Some(&metadata))
1278        {
1279            assert_eq!(key, "TOKEN");
1280            assert_eq!(alias, "TOKEN");
1281        } else {
1282            panic!("expected secret attachment");
1283        }
1284    }
1285
1286    #[test]
1287    fn attachment_kind_secret_with_alias() {
1288        let metadata = json!({
1289            "type": "secret",
1290            "key": "TOKEN",
1291            "alias": "api_token"
1292        });
1293        if let Some(AttachmentKind::Secret { key, alias }) =
1294            AttachmentKind::from_metadata(Some(&metadata))
1295        {
1296            assert_eq!(key, "TOKEN");
1297            assert_eq!(alias, "api_token");
1298        } else {
1299            panic!("expected secret attachment");
1300        }
1301    }
1302
1303    #[test]
1304    fn error_with_diagnostics_encodes_details_cbor() {
1305        let diagnostics = vec![Diagnostic {
1306            code: "op_not_found".to_string(),
1307            path: "/op_id".to_string(),
1308            severity: DiagnosticSeverity::Error,
1309            message_key: "runner.operator.op_not_found".to_string(),
1310            fallback: "op `echo` not found".to_string(),
1311            message: "op `echo` not found".to_string(),
1312            hint: None,
1313            component_id: Some("provider.demo".to_string()),
1314            digest: Some("sha256:abc123".to_string()),
1315            operation_id: Some("echo".to_string()),
1316        }];
1317
1318        let response = OperatorResponse::error_with_diagnostics(
1319            OperatorErrorCode::OpNotFound,
1320            "op not found",
1321            diagnostics.clone(),
1322        );
1323        let details = response
1324            .error
1325            .as_ref()
1326            .and_then(|err| err.details_cbor.as_ref())
1327            .expect("details_cbor must exist");
1328        let decoded: Vec<Diagnostic> =
1329            serde_cbor::from_slice(details).expect("diagnostics should decode");
1330        assert_eq!(decoded, diagnostics);
1331    }
1332
1333    #[test]
1334    fn validation_options_default_to_strict_with_output_validation() {
1335        let options = validation_options_from_flags(&[]);
1336        assert!(options.validate_output);
1337        assert!(options.strict);
1338    }
1339
1340    #[test]
1341    fn validation_options_apply_known_flags() {
1342        let options = validation_options_from_flags(&[
1343            FLAG_SKIP_OUTPUT_VALIDATE.to_string(),
1344            FLAG_PERMISSIVE_SCHEMA.to_string(),
1345        ]);
1346        assert!(!options.validate_output);
1347        assert!(!options.strict);
1348    }
1349
1350    #[test]
1351    fn normalize_operation_defaults_to_run_when_blank() {
1352        assert_eq!(normalize_operation_id(""), "run");
1353        assert_eq!(normalize_operation_id("   "), "run");
1354        assert_eq!(normalize_operation_id("render"), "render");
1355    }
1356
1357    #[test]
1358    fn compute_contract_hashes_is_deterministic() {
1359        let input_schema = json!({
1360            "type": "object",
1361            "properties": {
1362                "message": { "type": "string" }
1363            }
1364        });
1365        let output_schema = json!({
1366            "type": "object",
1367            "properties": {
1368                "result": { "type": "string" }
1369            }
1370        });
1371        let one = compute_contract_hashes(
1372            "sha256:abc",
1373            "provider.dummy",
1374            "echo",
1375            "greentic:provider-core@1.0.0",
1376            "provider-core",
1377            &input_schema,
1378            &output_schema,
1379            &input_schema,
1380            Some("schemas/state.schema.json"),
1381            "operator.provider@0.1.0",
1382        );
1383        let two = compute_contract_hashes(
1384            "sha256:abc",
1385            "provider.dummy",
1386            "echo",
1387            "greentic:provider-core@1.0.0",
1388            "provider-core",
1389            &input_schema,
1390            &output_schema,
1391            &input_schema,
1392            Some("schemas/state.schema.json"),
1393            "operator.provider@0.1.0",
1394        );
1395        assert_eq!(one, two);
1396    }
1397}