1use axum::{
2 body::{Body, to_bytes},
3 http::{HeaderMap, Response, StatusCode},
4};
5use serde::{Deserialize, Serialize};
6use serde_cbor;
7use serde_json::{Map, Value, json};
8use sha2::{Digest, Sha256};
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13use tracing::{Level, span};
14
15use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
16use crate::operator_registry::{OperatorBinding, OperatorResolveError};
17use crate::provider::ProviderBinding;
18use crate::routing::TenantRuntimeHandle;
19use crate::runner::contract_cache::ContractSnapshot;
20use crate::runner::contract_introspection::introspect_component_contract;
21use crate::runner::i18n::{I18nText, resolve_text, select_locale};
22use crate::runner::schema_validator::validate_json_instance;
23use crate::runtime::TenantRuntime;
24
25const CONTENT_TYPE_CBOR: &str = "application/cbor";
26const FLAG_SKIP_OUTPUT_VALIDATE: &str = "skip-output-validate";
27const FLAG_PERMISSIVE_SCHEMA: &str = "permissive-schema";
28
29#[derive(Debug, Deserialize)]
31pub struct OperatorRequest {
32 #[serde(default)]
33 pub tenant_id: Option<String>,
34 #[serde(default)]
35 pub provider_id: Option<String>,
36 #[serde(default)]
37 pub provider_type: Option<String>,
38 #[serde(default)]
39 pub pack_id: Option<String>,
40 pub op_id: String,
41 #[serde(default)]
42 pub trace_id: Option<String>,
43 #[serde(default)]
44 pub correlation_id: Option<String>,
45 #[serde(default)]
46 pub timeout: Option<u64>,
47 #[serde(default)]
48 pub flags: Vec<String>,
49 #[serde(default)]
50 pub op_version: Option<String>,
51 #[serde(default)]
52 pub schema_hash: Option<String>,
53 #[serde(default)]
54 pub locale: Option<String>,
55 pub payload: OperatorPayload,
56}
57
58impl OperatorRequest {
59 pub fn from_cbor(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
60 serde_cbor::from_slice(bytes)
61 }
62}
63
64#[derive(Debug, Deserialize)]
65pub struct OperatorPayload {
66 #[serde(default)]
67 #[serde(rename = "cbor_input")]
68 pub cbor_input: Vec<u8>,
69 #[serde(default)]
70 pub attachments: Vec<AttachmentRef>,
71}
72
73#[derive(Debug, Deserialize)]
74pub struct AttachmentRef {
75 pub id: String,
76 #[serde(default)]
77 pub metadata: Option<Value>,
78}
79
80#[derive(Debug, Serialize)]
82pub struct OperatorResponse {
83 pub status: OperatorStatus,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub cbor_output: Option<Vec<u8>>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub error: Option<OperatorError>,
88}
89
90impl OperatorResponse {
91 pub fn ok(output: Vec<u8>) -> Self {
92 Self {
93 status: OperatorStatus::Ok,
94 cbor_output: Some(output),
95 error: None,
96 }
97 }
98
99 pub fn error(code: OperatorErrorCode, message: impl Into<String>) -> Self {
100 Self {
101 status: OperatorStatus::Error,
102 cbor_output: None,
103 error: Some(OperatorError {
104 code,
105 message: message.into(),
106 details_cbor: None,
107 }),
108 }
109 }
110
111 pub fn error_with_diagnostics(
112 code: OperatorErrorCode,
113 message: impl Into<String>,
114 diagnostics: Vec<Diagnostic>,
115 ) -> Self {
116 let details_cbor = serde_cbor::to_vec(&diagnostics).ok();
117 Self {
118 status: OperatorStatus::Error,
119 cbor_output: None,
120 error: Some(OperatorError {
121 code,
122 message: message.into(),
123 details_cbor,
124 }),
125 }
126 }
127
128 pub fn to_cbor(&self) -> Result<Vec<u8>, serde_cbor::Error> {
129 serde_cbor::ser::to_vec_packed(self)
130 }
131}
132
133#[derive(Debug, Serialize)]
134pub struct OperatorError {
135 pub code: OperatorErrorCode,
136 pub message: String,
137 #[serde(skip_serializing_if = "Option::is_none")]
138 pub details_cbor: Option<Vec<u8>>,
139}
140
141#[derive(Debug, Serialize)]
142pub enum OperatorStatus {
143 Ok,
144 Error,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
148#[serde(rename_all = "snake_case")]
149pub enum DiagnosticSeverity {
150 Error,
151 Warning,
152 Info,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
156pub struct Diagnostic {
157 pub code: String,
158 pub path: String,
159 pub severity: DiagnosticSeverity,
160 pub message_key: String,
161 pub fallback: String,
162 pub message: String,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub hint: Option<String>,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub component_id: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub digest: Option<String>,
169 #[serde(skip_serializing_if = "Option::is_none")]
170 pub operation_id: Option<String>,
171}
172
173#[derive(Debug, Clone, Copy, Serialize)]
174#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
175pub enum OperatorErrorCode {
176 OpNotFound,
177 ProviderNotFound,
178 TenantNotAllowed,
179 InvalidRequest,
180 CborDecode,
181 TypeMismatch,
182 ComponentLoad,
183 InvokeTrap,
184 Timeout,
185 PolicyDenied,
186 HostFailure,
187}
188
189#[derive(Debug, Copy, Clone, PartialEq, Eq)]
190struct ExecutionValidationOptions {
191 validate_output: bool,
192 strict: bool,
193}
194
195impl Default for ExecutionValidationOptions {
196 fn default() -> Self {
197 Self {
198 validate_output: true,
199 strict: true,
200 }
201 }
202}
203
204fn validation_options_from_flags(flags: &[String]) -> ExecutionValidationOptions {
205 let mut options = ExecutionValidationOptions::default();
206 for flag in flags {
207 match flag.trim().to_ascii_lowercase().as_str() {
208 FLAG_SKIP_OUTPUT_VALIDATE => options.validate_output = false,
209 FLAG_PERMISSIVE_SCHEMA => options.strict = false,
210 _ => {}
211 }
212 }
213 options
214}
215
216fn normalize_operation_id(op_id: &str) -> String {
217 let normalized = op_id.trim();
218 if normalized.is_empty() {
219 "run".to_string()
220 } else {
221 normalized.to_string()
222 }
223}
224
225fn normalize_sha256_hash(value: &str) -> String {
226 let trimmed = value.trim();
227 if trimmed.starts_with("sha256:") {
228 trimmed.to_string()
229 } else {
230 format!("sha256:{trimmed}")
231 }
232}
233
234fn sha256_prefixed(bytes: &[u8]) -> String {
235 let mut hasher = Sha256::new();
236 hasher.update(bytes);
237 let digest = hasher.finalize();
238 format!("sha256:{}", to_hex(&digest))
239}
240
241fn to_hex(digest: &[u8]) -> String {
242 digest.iter().map(|byte| format!("{byte:02x}")).collect()
243}
244
245#[derive(Serialize)]
246struct SchemaHashMaterial<'a> {
247 resolved_digest: &'a str,
248 component_ref: &'a str,
249 operation_id: &'a str,
250 world: &'a str,
251 export: &'a str,
252 input_schema: &'a Value,
253 output_schema: &'a Value,
254 config_schema: &'a Value,
255 state_schema_ref: Option<&'a str>,
256}
257
258#[derive(Serialize)]
259struct DescribeHashMaterial<'a> {
260 resolved_digest: &'a str,
261 component_ref: &'a str,
262 world: &'a str,
263 export: &'a str,
264 pack_ref: &'a str,
265 input_schema: &'a Value,
266 output_schema: &'a Value,
267}
268
269#[allow(clippy::too_many_arguments)]
270fn compute_contract_hashes(
271 resolved_digest: &str,
272 component_ref: &str,
273 operation_id: &str,
274 world: &str,
275 export: &str,
276 input_schema: &Value,
277 output_schema: &Value,
278 config_schema: &Value,
279 state_schema_ref: Option<&str>,
280 pack_ref: &str,
281) -> (String, String) {
282 let input_schema = canonicalize_json_value(input_schema.clone());
283 let output_schema = canonicalize_json_value(output_schema.clone());
284 let config_schema = canonicalize_json_value(config_schema.clone());
285 let describe_material = DescribeHashMaterial {
286 resolved_digest,
287 component_ref,
288 world,
289 export,
290 pack_ref,
291 input_schema: &input_schema,
292 output_schema: &output_schema,
293 };
294 let describe_bytes =
295 serde_cbor::to_vec(&describe_material).expect("describe hash material serialization");
296 let describe_hash = sha256_prefixed(&describe_bytes);
297
298 let schema_material = SchemaHashMaterial {
299 resolved_digest,
300 component_ref,
301 operation_id,
302 world,
303 export,
304 input_schema: &input_schema,
305 output_schema: &output_schema,
306 config_schema: &config_schema,
307 state_schema_ref,
308 };
309 let schema_bytes =
310 serde_cbor::to_vec(&schema_material).expect("schema hash material serialization");
311 let schema_hash = sha256_prefixed(&schema_bytes);
312 (describe_hash, schema_hash)
313}
314
315fn canonicalize_json_value(value: Value) -> Value {
316 match value {
317 Value::Object(map) => {
318 let mut ordered = serde_json::Map::new();
319 let mut keys = map.keys().cloned().collect::<Vec<_>>();
320 keys.sort();
321 for key in keys {
322 let normalized = map
323 .get(&key)
324 .cloned()
325 .map(canonicalize_json_value)
326 .unwrap_or(Value::Null);
327 ordered.insert(key, normalized);
328 }
329 Value::Object(ordered)
330 }
331 Value::Array(items) => {
332 Value::Array(items.into_iter().map(canonicalize_json_value).collect())
333 }
334 other => other,
335 }
336}
337
338fn derive_output_schema_ref(config_schema_ref: Option<&str>) -> Option<String> {
339 let config_ref = config_schema_ref?;
340 let candidate = config_ref.replace("config", "output");
341 if candidate == config_ref {
342 None
343 } else {
344 Some(candidate)
345 }
346}
347
348fn schema_issues_to_diagnostics(
349 issues: Vec<crate::runner::schema_validator::SchemaValidationIssue>,
350 path_prefix: &str,
351 component_ref: &str,
352 resolved_digest: &str,
353 op_id: &str,
354 locale: &str,
355) -> Vec<Diagnostic> {
356 issues
357 .into_iter()
358 .map(|issue| {
359 let text = I18nText::new(issue.message_key, issue.fallback);
360 let path = if path_prefix.is_empty() {
361 issue.path
362 } else if issue.path == "/" {
363 path_prefix.to_string()
364 } else {
365 format!("{path_prefix}{}", issue.path)
366 };
367 Diagnostic {
368 code: issue.code,
369 path,
370 severity: DiagnosticSeverity::Error,
371 message_key: text.message_key.clone(),
372 fallback: text.fallback.clone(),
373 message: resolve_text(&text, locale),
374 hint: None,
375 component_id: Some(component_ref.to_string()),
376 digest: Some(resolved_digest.to_string()),
377 operation_id: Some(op_id.to_string()),
378 }
379 })
380 .collect()
381}
382
383#[allow(clippy::too_many_arguments)]
384fn diagnostic_error(
385 code: &str,
386 path: &str,
387 message_key: &str,
388 fallback: String,
389 operation_id: Option<&str>,
390 component_id: Option<&str>,
391 digest: Option<&str>,
392 locale: &str,
393) -> Diagnostic {
394 let text = I18nText::new(message_key, fallback);
395 let message = resolve_text(&text, locale);
396 Diagnostic {
397 code: code.to_string(),
398 path: path.to_string(),
399 severity: DiagnosticSeverity::Error,
400 message_key: text.message_key,
401 message,
402 fallback: text.fallback,
403 hint: None,
404 component_id: component_id.map(ToString::to_string),
405 digest: digest.map(ToString::to_string),
406 operation_id: operation_id.map(ToString::to_string),
407 }
408}
409
410impl OperatorErrorCode {
411 pub fn reason(&self) -> &'static str {
412 match self {
413 OperatorErrorCode::OpNotFound => "op not found",
414 OperatorErrorCode::ProviderNotFound => "provider not found",
415 OperatorErrorCode::TenantNotAllowed => "tenant not allowed",
416 OperatorErrorCode::InvalidRequest => "invalid operator request",
417 OperatorErrorCode::CborDecode => "failed to decode CBOR payload",
418 OperatorErrorCode::TypeMismatch => "type mismatch between CBOR and operation",
419 OperatorErrorCode::ComponentLoad => "failed to load component",
420 OperatorErrorCode::InvokeTrap => "component trapped during invoke",
421 OperatorErrorCode::Timeout => "invocation timed out",
422 OperatorErrorCode::PolicyDenied => "policy denied the operation",
423 OperatorErrorCode::HostFailure => "internal host failure",
424 }
425 }
426}
427
428pub async fn invoke_operator(
430 runtime: &TenantRuntime,
431 request: OperatorRequest,
432) -> OperatorResponse {
433 let op_id = normalize_operation_id(&request.op_id);
434 let validation_options = validation_options_from_flags(&request.flags);
435 let locale = select_locale(request.locale.as_deref());
436 if let Some(request_tenant) = request.tenant_id.as_deref()
437 && request_tenant != runtime.tenant()
438 {
439 let message = format!(
440 "tenant mismatch: routing resolved `{}` but request wants `{request_tenant}`",
441 runtime.tenant(),
442 );
443 return OperatorResponse::error_with_diagnostics(
444 OperatorErrorCode::TenantNotAllowed,
445 message.clone(),
446 vec![diagnostic_error(
447 "tenant_mismatch",
448 "/tenant_id",
449 "runner.operator.tenant_mismatch",
450 message,
451 Some(op_id.as_str()),
452 None,
453 runtime.digest(),
454 &locale,
455 )],
456 );
457 }
458
459 if request.provider_id.is_none() && request.provider_type.is_none() {
460 let message = "operator invoke requires provider_id or provider_type".to_string();
461 return OperatorResponse::error_with_diagnostics(
462 OperatorErrorCode::InvalidRequest,
463 message.clone(),
464 vec![diagnostic_error(
465 "missing_provider_selector",
466 "/provider_id",
467 "runner.operator.missing_provider_selector",
468 message,
469 Some(op_id.as_str()),
470 None,
471 runtime.digest(),
472 &locale,
473 )],
474 );
475 }
476
477 let tenant = runtime.tenant();
478 let root_span = span!(
479 Level::INFO,
480 "operator.invoke",
481 tenant = %tenant,
482 op_id = %op_id,
483 provider_id = ?request.provider_id,
484 provider_type = ?request.provider_type
485 );
486 let _root_guard = root_span.enter();
487
488 let provider_id = request.provider_id.as_deref();
489 let provider_type = request.provider_type.as_deref();
490 runtime
491 .operator_metrics()
492 .resolve_attempts
493 .fetch_add(1, Ordering::Relaxed);
494 let resolve_span = span!(Level::DEBUG, "resolve_op");
495 let _resolve_guard = resolve_span.enter();
496
497 let emit_resolve_error = |err: OperatorResolveError| {
498 let (code, message) = match err {
499 OperatorResolveError::ProviderNotFound => {
500 let label = provider_id.or(provider_type).unwrap_or("unknown");
501 (
502 OperatorErrorCode::ProviderNotFound,
503 format!("provider `{label}` not registered"),
504 )
505 }
506 OperatorResolveError::OpNotFound => {
507 let label = provider_id.or(provider_type).unwrap_or("unknown provider");
508 (
509 OperatorErrorCode::OpNotFound,
510 format!("op `{}` not found for provider `{label}`", &op_id),
511 )
512 }
513 };
514 runtime
515 .operator_metrics()
516 .resolve_errors
517 .fetch_add(1, Ordering::Relaxed);
518 let response = OperatorResponse::error(code, message);
519 let diagnostic = diagnostic_error(
520 match code {
521 OperatorErrorCode::ProviderNotFound => "provider_not_found",
522 OperatorErrorCode::OpNotFound => "op_not_found",
523 _ => "resolve_error",
524 },
525 "/op_id",
526 match code {
527 OperatorErrorCode::ProviderNotFound => "runner.operator.provider_not_found",
528 OperatorErrorCode::OpNotFound => "runner.operator.op_not_found",
529 _ => "runner.operator.resolve_error",
530 },
531 response
532 .error
533 .as_ref()
534 .map(|e| e.message.clone())
535 .unwrap_or_else(|| "operator resolve failed".to_string()),
536 Some(op_id.as_str()),
537 binding_component_ref_hint(provider_id, provider_type),
538 runtime.digest(),
539 &locale,
540 );
541 OperatorResponse::error_with_diagnostics(
542 code,
543 response
544 .error
545 .as_ref()
546 .map(|e| e.message.clone())
547 .unwrap_or_else(|| "operator resolve failed".to_string()),
548 vec![diagnostic],
549 )
550 };
551
552 let initial = runtime
563 .operator_registry()
564 .resolve(provider_id, provider_type, &op_id);
565 let (binding, probe_binding): (OperatorBinding, Option<ProviderBinding>) = match initial {
566 Ok(b) => (b.clone(), None),
567 Err(OperatorResolveError::ProviderNotFound)
568 if let Some(id) = provider_id
569 && provider_type.is_none() =>
570 {
571 match runtime.main_pack().resolve_provider(Some(id), None) {
572 Ok(pb) => {
573 let derived = pb.provider_type.clone();
574 match runtime
579 .operator_registry()
580 .resolve(None, Some(derived.as_str()), &op_id)
581 {
582 Ok(b) => (b.clone(), Some(pb)),
583 Err(err) => return emit_resolve_error(err),
584 }
585 }
586 Err(probe_err) => {
587 tracing::warn!(
592 provider_id = %id,
593 error = %probe_err,
594 "state-store probe failed for id-only operator dispatch"
595 );
596 return emit_resolve_error(OperatorResolveError::ProviderNotFound);
597 }
598 }
599 }
600 Err(err) => return emit_resolve_error(err),
601 };
602 drop(_resolve_guard);
603
604 let policy = &runtime.config().operator_policy;
605 if !policy.allows_provider(provider_id, binding.provider_type.as_str()) {
606 return OperatorResponse::error(
607 OperatorErrorCode::PolicyDenied,
608 format!(
609 "provider `{}` not allowed for tenant {}",
610 binding
611 .provider_id
612 .as_deref()
613 .unwrap_or(&binding.provider_type),
614 runtime.config().tenant
615 ),
616 );
617 }
618 if !policy.allows_op(provider_id, binding.provider_type.as_str(), &binding.op_id) {
619 return OperatorResponse::error(
620 OperatorErrorCode::PolicyDenied,
621 format!(
622 "op `{}` is not permitted for provider `{}` on tenant {}",
623 binding.op_id,
624 binding
625 .provider_id
626 .as_deref()
627 .unwrap_or(&binding.provider_type),
628 runtime.config().tenant
629 ),
630 );
631 }
632
633 if let Some(req_pack) = request.pack_id.as_deref() {
634 let binding_pack = binding
635 .pack_ref
636 .split('@')
637 .next()
638 .unwrap_or(&binding.pack_ref);
639 if binding_pack != req_pack {
640 return OperatorResponse::error(
641 OperatorErrorCode::PolicyDenied,
642 format!(
643 "request bound to pack `{req_pack}`, but op lives in `{}`",
644 binding.pack_ref
645 ),
646 );
647 }
648 }
649
650 let attachments = match resolve_attachments(&request.payload, runtime) {
651 Ok(map) => map,
652 Err(response) => return response,
653 };
654
655 let decode_span = span!(Level::DEBUG, "decode_cbor");
656 let _decode_guard = decode_span.enter();
657 let input_value = match decode_request_payload(&request.payload.cbor_input) {
658 Ok(value) => value,
659 Err(err) => {
660 runtime
661 .operator_metrics()
662 .cbor_decode_errors
663 .fetch_add(1, Ordering::Relaxed);
664 return OperatorResponse::error(OperatorErrorCode::CborDecode, format!("{err}"));
665 }
666 };
667 drop(_decode_guard);
668
669 let input_value = merge_input_with_attachments(input_value, attachments);
670
671 let registry_component_ref = &binding.runtime.component_ref;
672 let resolved = match runtime.resolve_component(registry_component_ref) {
673 Some(resolved) => resolved,
674 None => {
675 return OperatorResponse::error(
676 OperatorErrorCode::ComponentLoad,
677 format!(
678 "component `{}` not found in tenant packs",
679 registry_component_ref
680 ),
681 );
682 }
683 };
684 let pack = resolved.pack;
685 let provider_binding = match probe_binding {
693 Some(pb) => pb,
694 None => match pack.resolve_provider(provider_id, provider_type) {
695 Ok(binding) => binding,
696 Err(err) => {
697 return OperatorResponse::error(
698 OperatorErrorCode::HostFailure,
699 format!("failed to resolve provider runtime: {err}"),
700 );
701 }
702 },
703 };
704
705 let component_ref = &provider_binding.component_ref;
706 let resolved = match runtime.resolve_component(component_ref) {
707 Some(resolved) => resolved,
708 None => {
709 return OperatorResponse::error(
710 OperatorErrorCode::ComponentLoad,
711 format!("component `{}` not found in tenant packs", component_ref),
712 );
713 }
714 };
715 let pack = resolved.pack;
716 let resolved_digest = if resolved.digest == "unknown" {
717 binding
718 .pack_digest
719 .clone()
720 .unwrap_or_else(|| resolved.digest.clone())
721 } else {
722 resolved.digest.clone()
723 };
724 let introspected_contract =
725 match introspect_component_contract(pack.as_ref(), component_ref.as_str(), &op_id) {
726 Ok(value) => value,
727 Err(err) => {
728 let message = format!("failed to introspect component contract: {err}");
729 return OperatorResponse::error_with_diagnostics(
730 OperatorErrorCode::TypeMismatch,
731 message.clone(),
732 vec![diagnostic_error(
733 "contract_introspection_failed",
734 "/operation",
735 "runner.operator.contract_introspection_failed",
736 message,
737 Some(op_id.as_str()),
738 Some(component_ref.as_str()),
739 Some(resolved_digest.as_str()),
740 &locale,
741 )],
742 );
743 }
744 };
745 let invoke_op_id = introspected_contract
746 .as_ref()
747 .map(|contract| contract.selected_operation.clone())
748 .unwrap_or_else(|| op_id.clone());
749 let loaded_config_schema = introspected_contract
750 .as_ref()
751 .map(|contract| contract.config_schema.clone())
752 .filter(|value| !value.is_null())
753 .or_else(|| {
754 binding
755 .config_schema_ref
756 .as_deref()
757 .and_then(|schema_ref| pack.load_schema_json(schema_ref).ok().flatten())
758 })
759 .unwrap_or(Value::Null);
760 let loaded_output_schema = introspected_contract
761 .as_ref()
762 .map(|contract| contract.output_schema.clone())
763 .filter(|value| !value.is_null())
764 .or_else(|| {
765 derive_output_schema_ref(binding.config_schema_ref.as_deref())
766 .and_then(|schema_ref| pack.load_schema_json(&schema_ref).ok().flatten())
767 })
768 .unwrap_or(Value::Null);
769 let loaded_input_schema = introspected_contract
770 .as_ref()
771 .map(|contract| contract.input_schema.clone())
772 .filter(|value| !value.is_null())
773 .or_else(|| loaded_config_schema.is_null().then_some(Value::Null))
774 .or_else(|| Some(loaded_config_schema.clone()))
775 .unwrap_or(Value::Null);
776 let loaded_config_schema = binding
777 .config_schema_ref
778 .as_deref()
779 .and_then(|schema_ref| pack.load_schema_json(schema_ref).ok().flatten())
780 .unwrap_or_else(|| loaded_config_schema.clone());
781 let contract_key = format!(
782 "{}::{component_ref}::{op_id}::validate_output={}::strict={}",
783 resolved_digest, validation_options.validate_output, validation_options.strict
784 );
785 let _contract_snapshot = if let Some(snapshot) = runtime.contract_cache().get(&contract_key) {
786 snapshot
787 } else {
788 let (describe_hash, schema_hash) = introspected_contract
789 .as_ref()
790 .map(|contract| (contract.describe_hash.clone(), contract.schema_hash.clone()))
791 .unwrap_or_else(|| {
792 compute_contract_hashes(
793 &resolved_digest,
794 component_ref,
795 &invoke_op_id,
796 &provider_binding.world,
797 &provider_binding.export,
798 &loaded_input_schema,
799 &loaded_output_schema,
800 &loaded_config_schema,
801 binding.state_schema_ref.as_deref(),
802 &binding.pack_ref,
803 )
804 });
805 let mut snapshot = ContractSnapshot::new(
806 resolved_digest.clone(),
807 component_ref.clone(),
808 invoke_op_id.clone(),
809 validation_options.validate_output,
810 validation_options.strict,
811 );
812 snapshot.describe_hash = Some(describe_hash);
813 snapshot.schema_hash = Some(schema_hash);
814 let snapshot = Arc::new(snapshot);
815 runtime
816 .contract_cache()
817 .insert(contract_key, Arc::clone(&snapshot));
818 snapshot
819 };
820 if !loaded_input_schema.is_null() {
821 let issues = validate_json_instance(
822 &loaded_input_schema,
823 &input_value,
824 validation_options.strict,
825 );
826 if !issues.is_empty() {
827 let diagnostics = schema_issues_to_diagnostics(
828 issues,
829 "/input",
830 component_ref,
831 &resolved_digest,
832 &op_id,
833 &locale,
834 );
835 return OperatorResponse::error_with_diagnostics(
836 OperatorErrorCode::TypeMismatch,
837 "input failed schema validation".to_string(),
838 diagnostics,
839 );
840 }
841 } else if validation_options.strict && binding.config_schema_ref.is_some() {
842 let message = format!(
843 "schema `{}` referenced by op `{}` was not found in pack",
844 binding.config_schema_ref.as_deref().unwrap_or("unknown"),
845 op_id
846 );
847 return OperatorResponse::error_with_diagnostics(
848 OperatorErrorCode::TypeMismatch,
849 message.clone(),
850 vec![diagnostic_error(
851 "schema_ref_not_found",
852 "/schema_hash",
853 "runner.operator.schema_ref_not_found",
854 message,
855 Some(op_id.as_str()),
856 Some(component_ref.as_str()),
857 Some(resolved_digest.as_str()),
858 &locale,
859 )],
860 );
861 }
862
863 if let Some(request_schema_hash) = request.schema_hash.as_deref()
864 && let Some(expected_schema_hash) = _contract_snapshot.schema_hash.as_deref()
865 {
866 let expected = normalize_sha256_hash(expected_schema_hash);
867 let provided = normalize_sha256_hash(request_schema_hash);
868 if expected != provided {
869 let message = format!(
870 "schema_hash mismatch for op `{}`: expected `{}`, got `{}`",
871 op_id, expected, provided
872 );
873 return OperatorResponse::error_with_diagnostics(
874 OperatorErrorCode::TypeMismatch,
875 message.clone(),
876 vec![diagnostic_error(
877 "schema_hash_mismatch",
878 "/schema_hash",
879 "runner.operator.schema_hash_mismatch",
880 message,
881 Some(op_id.as_str()),
882 Some(component_ref.as_str()),
883 Some(resolved_digest.as_str()),
884 &locale,
885 )],
886 );
887 }
888 }
889
890 let input_json = match serde_json::to_string(&input_value) {
891 Ok(json) => json,
892 Err(err) => {
893 return OperatorResponse::error(
894 OperatorErrorCode::TypeMismatch,
895 format!("failed to serialise input JSON: {err}"),
896 );
897 }
898 };
899
900 let exec_ctx = build_exec_ctx(&request, runtime, &op_id);
901 runtime
902 .operator_metrics()
903 .invoke_attempts
904 .fetch_add(1, Ordering::Relaxed);
905 let invoke_span = span!(Level::INFO, "invoke_component", component = %component_ref);
906 let _invoke_guard = invoke_span.enter();
907 let result = if provider_binding.world.starts_with("greentic:provider-core") {
908 let input_bytes = input_json.clone().into_bytes();
909 match pack
910 .invoke_provider(&provider_binding, exec_ctx, &invoke_op_id, input_bytes)
911 .await
912 {
913 Ok(value) => value,
914 Err(err) => {
915 runtime
916 .operator_metrics()
917 .invoke_errors
918 .fetch_add(1, Ordering::Relaxed);
919 return OperatorResponse::error(
920 OperatorErrorCode::HostFailure,
921 format!("provider invoke failed: {err}"),
922 );
923 }
924 }
925 } else {
926 match pack
927 .invoke_component(
928 component_ref,
929 exec_ctx,
930 &invoke_op_id,
931 provider_binding.config_json.clone(),
932 input_json.clone(),
933 )
934 .await
935 {
936 Ok(value) => value,
937 Err(err) => {
938 runtime
939 .operator_metrics()
940 .invoke_errors
941 .fetch_add(1, Ordering::Relaxed);
942 return OperatorResponse::error(
943 OperatorErrorCode::HostFailure,
944 format!("component invoke failed: {err}"),
945 );
946 }
947 }
948 };
949 drop(_invoke_guard);
950
951 if validation_options.validate_output
952 && let Some(output_ref) = derive_output_schema_ref(binding.config_schema_ref.as_deref())
953 && let Ok(Some(output_schema)) = pack.load_schema_json(&output_ref)
954 {
955 let output_value = result
956 .as_object()
957 .and_then(|obj| obj.get("output"))
958 .unwrap_or(&result);
959 let issues =
960 validate_json_instance(&output_schema, output_value, validation_options.strict);
961 if !issues.is_empty() {
962 let diagnostics = schema_issues_to_diagnostics(
963 issues,
964 "/output",
965 component_ref,
966 &resolved_digest,
967 &op_id,
968 &locale,
969 );
970 return OperatorResponse::error_with_diagnostics(
971 OperatorErrorCode::TypeMismatch,
972 "output failed schema validation".to_string(),
973 diagnostics,
974 );
975 }
976 }
977
978 if let Some(new_state) = result.as_object().and_then(|obj| obj.get("new_state")) {
979 if let Some(config_ref) = binding.config_schema_ref.as_deref() {
980 let config_schema = match pack.load_schema_json(config_ref) {
981 Ok(Some(schema)) => schema,
982 Ok(None) => {
983 let message = format!(
984 "config schema `{}` required for new_state validation was not found",
985 config_ref
986 );
987 return OperatorResponse::error_with_diagnostics(
988 OperatorErrorCode::TypeMismatch,
989 message.clone(),
990 vec![diagnostic_error(
991 "new_state_schema_missing",
992 "/new_state",
993 "runner.operator.new_state_schema_missing",
994 message,
995 Some(op_id.as_str()),
996 Some(component_ref.as_str()),
997 Some(resolved_digest.as_str()),
998 &locale,
999 )],
1000 );
1001 }
1002 Err(err) => {
1003 let message = format!(
1004 "failed to load config schema `{}` for new_state validation: {}",
1005 config_ref, err
1006 );
1007 return OperatorResponse::error_with_diagnostics(
1008 OperatorErrorCode::TypeMismatch,
1009 message.clone(),
1010 vec![diagnostic_error(
1011 "new_state_schema_load_failed",
1012 "/new_state",
1013 "runner.operator.new_state_schema_load_failed",
1014 message,
1015 Some(op_id.as_str()),
1016 Some(component_ref.as_str()),
1017 Some(resolved_digest.as_str()),
1018 &locale,
1019 )],
1020 );
1021 }
1022 };
1023 let issues =
1024 validate_json_instance(&config_schema, new_state, validation_options.strict);
1025 if !issues.is_empty() {
1026 let diagnostics = schema_issues_to_diagnostics(
1027 issues,
1028 "/new_state",
1029 component_ref,
1030 &resolved_digest,
1031 &op_id,
1032 &locale,
1033 );
1034 return OperatorResponse::error_with_diagnostics(
1035 OperatorErrorCode::TypeMismatch,
1036 "new_state failed schema validation".to_string(),
1037 diagnostics,
1038 );
1039 }
1040 } else if validation_options.strict {
1041 let message = "new_state returned but no config_schema_ref is available".to_string();
1042 return OperatorResponse::error_with_diagnostics(
1043 OperatorErrorCode::TypeMismatch,
1044 message.clone(),
1045 vec![diagnostic_error(
1046 "new_state_schema_unavailable",
1047 "/new_state",
1048 "runner.operator.new_state_schema_unavailable",
1049 message,
1050 Some(op_id.as_str()),
1051 Some(component_ref.as_str()),
1052 Some(resolved_digest.as_str()),
1053 &locale,
1054 )],
1055 );
1056 }
1057 }
1058
1059 let encode_span = span!(Level::DEBUG, "encode_cbor");
1060 let _encode_guard = encode_span.enter();
1061 let output_bytes = match serde_cbor::to_vec(&result) {
1062 Ok(bytes) => bytes,
1063 Err(err) => {
1064 return OperatorResponse::error(
1065 OperatorErrorCode::HostFailure,
1066 format!("failed to encode CBOR output: {err}"),
1067 );
1068 }
1069 };
1070 drop(_encode_guard);
1071
1072 OperatorResponse::ok(output_bytes)
1073}
1074
1075fn binding_component_ref_hint<'a>(
1076 provider_id: Option<&'a str>,
1077 provider_type: Option<&'a str>,
1078) -> Option<&'a str> {
1079 provider_id.or(provider_type)
1080}
1081
1082pub async fn invoke_operator_cbor(
1084 runtime: &TenantRuntime,
1085 req_cbor: &[u8],
1086) -> Result<Vec<u8>, serde_cbor::Error> {
1087 let request = OperatorRequest::from_cbor(req_cbor)?;
1088 let response = invoke_operator(runtime, request).await;
1089 response.to_cbor()
1090}
1091
1092pub async fn invoke(
1094 TenantRuntimeHandle { runtime, .. }: TenantRuntimeHandle,
1095 _headers: HeaderMap,
1096 body: Body,
1097) -> Result<Response<Body>, Response<Body>> {
1098 let bytes = match to_bytes(body, usize::MAX).await {
1099 Ok(bytes) => bytes,
1100 Err(err) => {
1101 return Err(bad_request(format!("failed to read body: {err}")));
1102 }
1103 };
1104
1105 let request = match OperatorRequest::from_cbor(&bytes) {
1106 Ok(request) => request,
1107 Err(err) => {
1108 return Err(bad_request(format!("failed to decode request CBOR: {err}")));
1109 }
1110 };
1111
1112 let response = invoke_operator(&runtime, request).await;
1113 build_cbor_response(response)
1114}
1115
1116fn bad_request(message: String) -> Response<Body> {
1117 let payload = json!({ "error": message });
1118 Response::builder()
1119 .status(StatusCode::BAD_REQUEST)
1120 .header("content-type", "application/json")
1121 .body(Body::from(payload.to_string()))
1122 .expect("building JSON error response must succeed")
1123}
1124
1125#[allow(clippy::result_large_err)]
1126fn build_cbor_response(response: OperatorResponse) -> Result<Response<Body>, Response<Body>> {
1127 match response.to_cbor() {
1128 Ok(bytes) => Ok(Response::builder()
1129 .status(StatusCode::OK)
1130 .header("content-type", CONTENT_TYPE_CBOR)
1131 .body(Body::from(bytes))
1132 .expect("building CBOR response must succeed")),
1133 Err(err) => Err(bad_request(format!(
1134 "failed to serialize response CBOR: {err}"
1135 ))),
1136 }
1137}
1138
1139fn decode_request_payload(bytes: &[u8]) -> Result<Value, serde_cbor::Error> {
1140 if bytes.is_empty() {
1141 return Ok(Value::Null);
1142 }
1143 serde_cbor::from_slice(bytes)
1144}
1145
1146fn build_exec_ctx(
1147 request: &OperatorRequest,
1148 runtime: &TenantRuntime,
1149 operation_id: &str,
1150) -> ComponentExecCtx {
1151 let deadline_unix_ms = request.timeout.and_then(|timeout_ms| {
1152 SystemTime::now()
1153 .checked_add(Duration::from_millis(timeout_ms))
1154 .and_then(|deadline| deadline.duration_since(UNIX_EPOCH).ok())
1155 .map(|duration| duration.as_millis() as u64)
1156 });
1157
1158 let tenant_ctx = ComponentTenantCtx {
1159 tenant: runtime.config().tenant.clone(),
1160 team: None,
1161 user: None,
1162 trace_id: request.trace_id.clone(),
1163 i18n_id: None,
1164 correlation_id: request.correlation_id.clone(),
1165 deadline_unix_ms,
1166 attempt: 1,
1167 idempotency_key: request.correlation_id.clone(),
1168 };
1169
1170 ComponentExecCtx {
1171 tenant: tenant_ctx,
1172 i18n_id: None,
1173 flow_id: format!("operator/{operation_id}"),
1174 node_id: None,
1175 }
1176}
1177
1178fn resolve_attachments(
1179 payload: &OperatorPayload,
1180 runtime: &TenantRuntime,
1181) -> Result<Map<String, Value>, OperatorResponse> {
1182 let mut attachments = Map::new();
1183 for attachment in &payload.attachments {
1184 if let Some(kind) = AttachmentKind::from_metadata(attachment.metadata.as_ref()) {
1185 match kind {
1186 AttachmentKind::Secret { key, alias } => {
1187 let secret = runtime.get_secret(&key).map_err(|err| {
1188 OperatorResponse::error(
1189 OperatorErrorCode::PolicyDenied,
1190 format!("secret `{key}` access denied: {err}"),
1191 )
1192 })?;
1193 attachments.insert(alias, Value::String(secret));
1194 }
1195 }
1196 }
1197 }
1198 Ok(attachments)
1199}
1200
1201fn merge_input_with_attachments(input: Value, attachments: Map<String, Value>) -> Value {
1202 if attachments.is_empty() {
1203 return input;
1204 }
1205 match input {
1206 Value::Object(mut map) => {
1207 map.insert("_attachments".into(), Value::Object(attachments));
1208 Value::Object(map)
1209 }
1210 other => {
1211 let mut map = Map::new();
1212 map.insert("input".into(), other);
1213 map.insert("_attachments".into(), Value::Object(attachments));
1214 Value::Object(map)
1215 }
1216 }
1217}
1218
1219enum AttachmentKind {
1220 Secret { key: String, alias: String },
1221}
1222
1223impl AttachmentKind {
1224 fn from_metadata(metadata: Option<&Value>) -> Option<Self> {
1225 let metadata = metadata?.as_object()?;
1226 let attachment_type = metadata.get("type")?.as_str()?;
1227 match attachment_type {
1228 "secret" => {
1229 let key = metadata.get("key")?.as_str()?.to_string();
1230 let alias = metadata
1231 .get("alias")
1232 .and_then(Value::as_str)
1233 .map(|value| value.to_string())
1234 .unwrap_or_else(|| key.clone());
1235 Some(AttachmentKind::Secret { key, alias })
1236 }
1237 _ => None,
1238 }
1239 }
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244 use super::*;
1245 use serde_json::{Map, Value, json};
1246
1247 #[test]
1248 fn merge_input_with_attachments_preserves_map_fields() {
1249 let mut attachments = Map::new();
1250 attachments.insert("secret".into(), json!("value"));
1251 let mut input_map = Map::new();
1252 input_map.insert("foo".into(), json!("bar"));
1253 let merged = merge_input_with_attachments(Value::Object(input_map), attachments.clone());
1254 let obj = merged.as_object().expect("should be object");
1255 assert_eq!(obj.get("foo"), Some(&json!("bar")));
1256 assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
1257 }
1258
1259 #[test]
1260 fn merge_input_with_attachments_wraps_scalar() {
1261 let mut attachments = Map::new();
1262 attachments.insert("secret".into(), json!("value"));
1263 let merged =
1264 merge_input_with_attachments(Value::String("text".into()), attachments.clone());
1265 let obj = merged.as_object().expect("should be object");
1266 assert_eq!(obj.get("input"), Some(&Value::String("text".into())));
1267 assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
1268 }
1269
1270 #[test]
1271 fn attachment_kind_secret_requires_type_lock() {
1272 let metadata = json!({
1273 "type": "secret",
1274 "key": "TOKEN"
1275 });
1276 if let Some(AttachmentKind::Secret { key, alias }) =
1277 AttachmentKind::from_metadata(Some(&metadata))
1278 {
1279 assert_eq!(key, "TOKEN");
1280 assert_eq!(alias, "TOKEN");
1281 } else {
1282 panic!("expected secret attachment");
1283 }
1284 }
1285
1286 #[test]
1287 fn attachment_kind_secret_with_alias() {
1288 let metadata = json!({
1289 "type": "secret",
1290 "key": "TOKEN",
1291 "alias": "api_token"
1292 });
1293 if let Some(AttachmentKind::Secret { key, alias }) =
1294 AttachmentKind::from_metadata(Some(&metadata))
1295 {
1296 assert_eq!(key, "TOKEN");
1297 assert_eq!(alias, "api_token");
1298 } else {
1299 panic!("expected secret attachment");
1300 }
1301 }
1302
1303 #[test]
1304 fn error_with_diagnostics_encodes_details_cbor() {
1305 let diagnostics = vec![Diagnostic {
1306 code: "op_not_found".to_string(),
1307 path: "/op_id".to_string(),
1308 severity: DiagnosticSeverity::Error,
1309 message_key: "runner.operator.op_not_found".to_string(),
1310 fallback: "op `echo` not found".to_string(),
1311 message: "op `echo` not found".to_string(),
1312 hint: None,
1313 component_id: Some("provider.demo".to_string()),
1314 digest: Some("sha256:abc123".to_string()),
1315 operation_id: Some("echo".to_string()),
1316 }];
1317
1318 let response = OperatorResponse::error_with_diagnostics(
1319 OperatorErrorCode::OpNotFound,
1320 "op not found",
1321 diagnostics.clone(),
1322 );
1323 let details = response
1324 .error
1325 .as_ref()
1326 .and_then(|err| err.details_cbor.as_ref())
1327 .expect("details_cbor must exist");
1328 let decoded: Vec<Diagnostic> =
1329 serde_cbor::from_slice(details).expect("diagnostics should decode");
1330 assert_eq!(decoded, diagnostics);
1331 }
1332
1333 #[test]
1334 fn validation_options_default_to_strict_with_output_validation() {
1335 let options = validation_options_from_flags(&[]);
1336 assert!(options.validate_output);
1337 assert!(options.strict);
1338 }
1339
1340 #[test]
1341 fn validation_options_apply_known_flags() {
1342 let options = validation_options_from_flags(&[
1343 FLAG_SKIP_OUTPUT_VALIDATE.to_string(),
1344 FLAG_PERMISSIVE_SCHEMA.to_string(),
1345 ]);
1346 assert!(!options.validate_output);
1347 assert!(!options.strict);
1348 }
1349
1350 #[test]
1351 fn normalize_operation_defaults_to_run_when_blank() {
1352 assert_eq!(normalize_operation_id(""), "run");
1353 assert_eq!(normalize_operation_id(" "), "run");
1354 assert_eq!(normalize_operation_id("render"), "render");
1355 }
1356
1357 #[test]
1358 fn compute_contract_hashes_is_deterministic() {
1359 let input_schema = json!({
1360 "type": "object",
1361 "properties": {
1362 "message": { "type": "string" }
1363 }
1364 });
1365 let output_schema = json!({
1366 "type": "object",
1367 "properties": {
1368 "result": { "type": "string" }
1369 }
1370 });
1371 let one = compute_contract_hashes(
1372 "sha256:abc",
1373 "provider.dummy",
1374 "echo",
1375 "greentic:provider-core@1.0.0",
1376 "provider-core",
1377 &input_schema,
1378 &output_schema,
1379 &input_schema,
1380 Some("schemas/state.schema.json"),
1381 "operator.provider@0.1.0",
1382 );
1383 let two = compute_contract_hashes(
1384 "sha256:abc",
1385 "provider.dummy",
1386 "echo",
1387 "greentic:provider-core@1.0.0",
1388 "provider-core",
1389 &input_schema,
1390 &output_schema,
1391 &input_schema,
1392 Some("schemas/state.schema.json"),
1393 "operator.provider@0.1.0",
1394 );
1395 assert_eq!(one, two);
1396 }
1397}