1use axum::{
2 body::{Body, to_bytes},
3 http::{HeaderMap, Response, StatusCode},
4};
5use serde::{Deserialize, Serialize};
6use serde_cbor;
7use serde_json::{Map, Value, json};
8use std::sync::atomic::Ordering;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use tracing::{Level, span};
12
13use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
14use crate::operator_registry::OperatorResolveError;
15use crate::provider::ProviderBinding;
16use crate::routing::TenantRuntimeHandle;
17use crate::runtime::TenantRuntime;
18
19const CONTENT_TYPE_CBOR: &str = "application/cbor";
20const OPERATOR_BODY_LIMIT: usize = 16 * 1024 * 1024;
21
22#[derive(Debug, Deserialize)]
24pub struct OperatorRequest {
25 #[serde(default)]
26 pub tenant_id: Option<String>,
27 #[serde(default)]
28 pub provider_id: Option<String>,
29 #[serde(default)]
30 pub provider_type: Option<String>,
31 #[serde(default)]
32 pub pack_id: Option<String>,
33 pub op_id: String,
34 #[serde(default)]
35 pub trace_id: Option<String>,
36 #[serde(default)]
37 pub correlation_id: Option<String>,
38 #[serde(default)]
39 pub timeout: Option<u64>,
40 #[serde(default)]
41 pub flags: Vec<String>,
42 #[serde(default)]
43 pub op_version: Option<String>,
44 #[serde(default)]
45 pub schema_hash: Option<String>,
46 pub payload: OperatorPayload,
47}
48
49impl OperatorRequest {
50 pub fn from_cbor(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
51 serde_cbor::from_slice(bytes)
52 }
53}
54
55#[derive(Debug, Deserialize)]
56pub struct OperatorPayload {
57 #[serde(default)]
58 #[serde(rename = "cbor_input")]
59 pub cbor_input: Vec<u8>,
60 #[serde(default)]
61 pub attachments: Vec<AttachmentRef>,
62}
63
64#[derive(Debug, Deserialize)]
65pub struct AttachmentRef {
66 pub id: String,
67 #[serde(default)]
68 pub metadata: Option<Value>,
69}
70
71#[derive(Debug, Serialize)]
73pub struct OperatorResponse {
74 pub status: OperatorStatus,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub cbor_output: Option<Vec<u8>>,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub error: Option<OperatorError>,
79}
80
81impl OperatorResponse {
82 pub fn ok(output: Vec<u8>) -> Self {
83 Self {
84 status: OperatorStatus::Ok,
85 cbor_output: Some(output),
86 error: None,
87 }
88 }
89
90 pub fn error(code: OperatorErrorCode, message: impl Into<String>) -> Self {
91 Self {
92 status: OperatorStatus::Error,
93 cbor_output: None,
94 error: Some(OperatorError {
95 code,
96 message: message.into(),
97 details_cbor: None,
98 }),
99 }
100 }
101
102 pub fn to_cbor(&self) -> Result<Vec<u8>, serde_cbor::Error> {
103 serde_cbor::ser::to_vec_packed(self)
104 }
105}
106
107#[derive(Debug, Serialize)]
108pub struct OperatorError {
109 pub code: OperatorErrorCode,
110 pub message: String,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub details_cbor: Option<Vec<u8>>,
113}
114
115#[derive(Debug, Serialize)]
116pub enum OperatorStatus {
117 Ok,
118 Error,
119}
120
121#[derive(Debug, Serialize)]
122#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
123pub enum OperatorErrorCode {
124 OpNotFound,
125 ProviderNotFound,
126 TenantNotAllowed,
127 InvalidRequest,
128 CborDecode,
129 TypeMismatch,
130 ComponentLoad,
131 InvokeTrap,
132 Timeout,
133 PolicyDenied,
134 HostFailure,
135}
136
137impl OperatorErrorCode {
138 pub fn reason(&self) -> &'static str {
139 match self {
140 OperatorErrorCode::OpNotFound => "op not found",
141 OperatorErrorCode::ProviderNotFound => "provider not found",
142 OperatorErrorCode::TenantNotAllowed => "tenant not allowed",
143 OperatorErrorCode::InvalidRequest => "invalid operator request",
144 OperatorErrorCode::CborDecode => "failed to decode CBOR payload",
145 OperatorErrorCode::TypeMismatch => "type mismatch between CBOR and operation",
146 OperatorErrorCode::ComponentLoad => "failed to load component",
147 OperatorErrorCode::InvokeTrap => "component trapped during invoke",
148 OperatorErrorCode::Timeout => "invocation timed out",
149 OperatorErrorCode::PolicyDenied => "policy denied the operation",
150 OperatorErrorCode::HostFailure => "internal host failure",
151 }
152 }
153}
154
155pub async fn invoke_operator(
157 runtime: &TenantRuntime,
158 request: OperatorRequest,
159) -> OperatorResponse {
160 if let Some(request_tenant) = request.tenant_id.as_deref()
161 && request_tenant != runtime.tenant()
162 {
163 return OperatorResponse::error(
164 OperatorErrorCode::TenantNotAllowed,
165 format!(
166 "tenant mismatch: routing resolved `{}` but request wants `{request_tenant}`",
167 runtime.tenant(),
168 ),
169 );
170 }
171
172 if request.provider_id.is_none() && request.provider_type.is_none() {
173 return OperatorResponse::error(
174 OperatorErrorCode::InvalidRequest,
175 "operator invoke requires provider_id or provider_type".to_string(),
176 );
177 }
178
179 let tenant = runtime.tenant();
180 let root_span = span!(
181 Level::INFO,
182 "operator.invoke",
183 tenant = %tenant,
184 op_id = %request.op_id,
185 provider_id = ?request.provider_id,
186 provider_type = ?request.provider_type
187 );
188 let _root_guard = root_span.enter();
189
190 let provider_id = request.provider_id.as_deref();
191 let provider_type = request.provider_type.as_deref();
192 runtime
193 .operator_metrics()
194 .resolve_attempts
195 .fetch_add(1, Ordering::Relaxed);
196 let resolve_span = span!(Level::DEBUG, "resolve_op");
197 let _resolve_guard = resolve_span.enter();
198 let binding =
199 match runtime
200 .operator_registry()
201 .resolve(provider_id, provider_type, &request.op_id)
202 {
203 Ok(binding) => binding,
204 Err(err) => {
205 let (code, message) = match err {
206 OperatorResolveError::ProviderNotFound => {
207 let label = provider_id.or(provider_type).unwrap_or("unknown");
208 (
209 OperatorErrorCode::ProviderNotFound,
210 format!("provider `{label}` not registered"),
211 )
212 }
213 OperatorResolveError::OpNotFound => {
214 let label = provider_id.or(provider_type).unwrap_or("unknown provider");
215 (
216 OperatorErrorCode::OpNotFound,
217 format!("op `{}` not found for provider `{label}`", &request.op_id),
218 )
219 }
220 };
221 runtime
222 .operator_metrics()
223 .resolve_errors
224 .fetch_add(1, Ordering::Relaxed);
225 let response = OperatorResponse::error(code, message);
226 return response;
227 }
228 };
229 drop(_resolve_guard);
230
231 let policy = &runtime.config().operator_policy;
232 if !policy.allows_provider(provider_id, binding.provider_type.as_str()) {
233 return OperatorResponse::error(
234 OperatorErrorCode::PolicyDenied,
235 format!(
236 "provider `{}` not allowed for tenant {}",
237 binding
238 .provider_id
239 .as_deref()
240 .unwrap_or(&binding.provider_type),
241 runtime.config().tenant
242 ),
243 );
244 }
245 if !policy.allows_op(provider_id, binding.provider_type.as_str(), &binding.op_id) {
246 return OperatorResponse::error(
247 OperatorErrorCode::PolicyDenied,
248 format!(
249 "op `{}` is not permitted for provider `{}` on tenant {}",
250 binding.op_id,
251 binding
252 .provider_id
253 .as_deref()
254 .unwrap_or(&binding.provider_type),
255 runtime.config().tenant
256 ),
257 );
258 }
259
260 if let Some(req_pack) = request.pack_id.as_deref() {
261 let binding_pack = binding
262 .pack_ref
263 .split('@')
264 .next()
265 .unwrap_or(&binding.pack_ref);
266 if binding_pack != req_pack {
267 return OperatorResponse::error(
268 OperatorErrorCode::PolicyDenied,
269 format!(
270 "request bound to pack `{req_pack}`, but op lives in `{}`",
271 binding.pack_ref
272 ),
273 );
274 }
275 }
276
277 let attachments = match resolve_attachments(&request.payload, runtime) {
278 Ok(map) => map,
279 Err(response) => return response,
280 };
281
282 let decode_span = span!(Level::DEBUG, "decode_cbor");
283 let _decode_guard = decode_span.enter();
284 let input_value = match decode_request_payload(&request.payload.cbor_input) {
285 Ok(value) => value,
286 Err(err) => {
287 runtime
288 .operator_metrics()
289 .cbor_decode_errors
290 .fetch_add(1, Ordering::Relaxed);
291 return OperatorResponse::error(OperatorErrorCode::CborDecode, format!("{err}"));
292 }
293 };
294 drop(_decode_guard);
295
296 let input_value = merge_input_with_attachments(input_value, attachments);
297
298 let input_json = match serde_json::to_string(&input_value) {
299 Ok(json) => json,
300 Err(err) => {
301 return OperatorResponse::error(
302 OperatorErrorCode::TypeMismatch,
303 format!("failed to serialise input JSON: {err}"),
304 );
305 }
306 };
307
308 let component_ref = &binding.runtime.component_ref;
309 let pack = match runtime.pack_for_component(component_ref) {
310 Some(pack) => pack,
311 None => {
312 return OperatorResponse::error(
313 OperatorErrorCode::ComponentLoad,
314 format!("component `{}` not found in tenant packs", component_ref),
315 );
316 }
317 };
318
319 let exec_ctx = build_exec_ctx(&request, runtime);
320 runtime
321 .operator_metrics()
322 .invoke_attempts
323 .fetch_add(1, Ordering::Relaxed);
324 let invoke_span = span!(Level::INFO, "invoke_component", component = %component_ref);
325 let _invoke_guard = invoke_span.enter();
326 let result = if binding.runtime.world.starts_with("greentic:provider-core") {
327 let input_bytes = input_json.clone().into_bytes();
328 let provider_binding = ProviderBinding {
329 provider_id: binding.provider_id.clone(),
330 provider_type: binding.provider_type.clone(),
331 component_ref: binding.runtime.component_ref.clone(),
332 export: binding.runtime.export.clone(),
333 world: binding.runtime.world.clone(),
334 config_json: None,
335 pack_ref: Some(binding.pack_ref.clone()),
336 };
337 match pack
338 .invoke_provider(&provider_binding, exec_ctx, &binding.op_id, input_bytes)
339 .await
340 {
341 Ok(value) => value,
342 Err(err) => {
343 runtime
344 .operator_metrics()
345 .invoke_errors
346 .fetch_add(1, Ordering::Relaxed);
347 return OperatorResponse::error(
348 OperatorErrorCode::HostFailure,
349 format!("provider invoke failed: {err}"),
350 );
351 }
352 }
353 } else {
354 match pack
355 .invoke_component(
356 component_ref,
357 exec_ctx,
358 &binding.op_id,
359 None,
360 input_json.clone(),
361 )
362 .await
363 {
364 Ok(value) => value,
365 Err(err) => {
366 runtime
367 .operator_metrics()
368 .invoke_errors
369 .fetch_add(1, Ordering::Relaxed);
370 return OperatorResponse::error(
371 OperatorErrorCode::HostFailure,
372 format!("component invoke failed: {err}"),
373 );
374 }
375 }
376 };
377 drop(_invoke_guard);
378
379 let encode_span = span!(Level::DEBUG, "encode_cbor");
380 let _encode_guard = encode_span.enter();
381 let output_bytes = match serde_cbor::to_vec(&result) {
382 Ok(bytes) => bytes,
383 Err(err) => {
384 return OperatorResponse::error(
385 OperatorErrorCode::HostFailure,
386 format!("failed to encode CBOR output: {err}"),
387 );
388 }
389 };
390 drop(_encode_guard);
391
392 OperatorResponse::ok(output_bytes)
393}
394
395pub async fn invoke_operator_cbor(
397 runtime: &TenantRuntime,
398 req_cbor: &[u8],
399) -> Result<Vec<u8>, serde_cbor::Error> {
400 let request = OperatorRequest::from_cbor(req_cbor)?;
401 let response = invoke_operator(runtime, request).await;
402 response.to_cbor()
403}
404
405pub async fn invoke(
407 TenantRuntimeHandle { runtime, .. }: TenantRuntimeHandle,
408 _headers: HeaderMap,
409 body: Body,
410) -> Result<Response<Body>, Response<Body>> {
411 let bytes = match to_bytes(body, OPERATOR_BODY_LIMIT).await {
412 Ok(bytes) => bytes,
413 Err(err) => {
414 return Err(bad_request(format!("failed to read body: {err}")));
415 }
416 };
417
418 let request = match OperatorRequest::from_cbor(&bytes) {
419 Ok(request) => request,
420 Err(err) => {
421 return Err(bad_request(format!("failed to decode request CBOR: {err}")));
422 }
423 };
424
425 let response = invoke_operator(&runtime, request).await;
426 build_cbor_response(response)
427}
428
429fn bad_request(message: String) -> Response<Body> {
430 let payload = json!({ "error": message });
431 Response::builder()
432 .status(StatusCode::BAD_REQUEST)
433 .header("content-type", "application/json")
434 .body(Body::from(payload.to_string()))
435 .expect("building JSON error response must succeed")
436}
437
438#[allow(clippy::result_large_err)]
439fn build_cbor_response(response: OperatorResponse) -> Result<Response<Body>, Response<Body>> {
440 match response.to_cbor() {
441 Ok(bytes) => Ok(Response::builder()
442 .status(StatusCode::OK)
443 .header("content-type", CONTENT_TYPE_CBOR)
444 .body(Body::from(bytes))
445 .expect("building CBOR response must succeed")),
446 Err(err) => Err(bad_request(format!(
447 "failed to serialize response CBOR: {err}"
448 ))),
449 }
450}
451
452fn decode_request_payload(bytes: &[u8]) -> Result<Value, serde_cbor::Error> {
453 if bytes.is_empty() {
454 return Ok(Value::Null);
455 }
456 serde_cbor::from_slice(bytes)
457}
458
459fn build_exec_ctx(request: &OperatorRequest, runtime: &TenantRuntime) -> ComponentExecCtx {
460 let deadline_unix_ms = request.timeout.and_then(|timeout_ms| {
461 SystemTime::now()
462 .checked_add(Duration::from_millis(timeout_ms))
463 .and_then(|deadline| deadline.duration_since(UNIX_EPOCH).ok())
464 .map(|duration| duration.as_millis() as u64)
465 });
466
467 let tenant_ctx = ComponentTenantCtx {
468 tenant: runtime.config().tenant.clone(),
469 team: None,
470 user: None,
471 trace_id: request.trace_id.clone(),
472 correlation_id: request.correlation_id.clone(),
473 deadline_unix_ms,
474 attempt: 1,
475 idempotency_key: request.correlation_id.clone(),
476 };
477
478 ComponentExecCtx {
479 tenant: tenant_ctx,
480 flow_id: format!("operator/{}", request.op_id),
481 node_id: None,
482 }
483}
484
485fn resolve_attachments(
486 payload: &OperatorPayload,
487 runtime: &TenantRuntime,
488) -> Result<Map<String, Value>, OperatorResponse> {
489 let mut attachments = Map::new();
490 for attachment in &payload.attachments {
491 if let Some(kind) = AttachmentKind::from_metadata(attachment.metadata.as_ref()) {
492 match kind {
493 AttachmentKind::Secret { key, alias } => {
494 let secret = runtime.get_secret(&key).map_err(|err| {
495 OperatorResponse::error(
496 OperatorErrorCode::PolicyDenied,
497 format!("secret `{key}` access denied: {err}"),
498 )
499 })?;
500 attachments.insert(alias, Value::String(secret));
501 }
502 }
503 }
504 }
505 Ok(attachments)
506}
507
508fn merge_input_with_attachments(input: Value, attachments: Map<String, Value>) -> Value {
509 if attachments.is_empty() {
510 return input;
511 }
512 match input {
513 Value::Object(mut map) => {
514 map.insert("_attachments".into(), Value::Object(attachments));
515 Value::Object(map)
516 }
517 other => {
518 let mut map = Map::new();
519 map.insert("input".into(), other);
520 map.insert("_attachments".into(), Value::Object(attachments));
521 Value::Object(map)
522 }
523 }
524}
525
526enum AttachmentKind {
527 Secret { key: String, alias: String },
528}
529
530impl AttachmentKind {
531 fn from_metadata(metadata: Option<&Value>) -> Option<Self> {
532 let metadata = metadata?.as_object()?;
533 let attachment_type = metadata.get("type")?.as_str()?;
534 match attachment_type {
535 "secret" => {
536 let key = metadata.get("key")?.as_str()?.to_string();
537 let alias = metadata
538 .get("alias")
539 .and_then(Value::as_str)
540 .map(|value| value.to_string())
541 .unwrap_or_else(|| key.clone());
542 Some(AttachmentKind::Secret { key, alias })
543 }
544 _ => None,
545 }
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552 use serde_json::{Map, Value, json};
553
554 #[test]
555 fn merge_input_with_attachments_preserves_map_fields() {
556 let mut attachments = Map::new();
557 attachments.insert("secret".into(), json!("value"));
558 let mut input_map = Map::new();
559 input_map.insert("foo".into(), json!("bar"));
560 let merged = merge_input_with_attachments(Value::Object(input_map), attachments.clone());
561 let obj = merged.as_object().expect("should be object");
562 assert_eq!(obj.get("foo"), Some(&json!("bar")));
563 assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
564 }
565
566 #[test]
567 fn merge_input_with_attachments_wraps_scalar() {
568 let mut attachments = Map::new();
569 attachments.insert("secret".into(), json!("value"));
570 let merged =
571 merge_input_with_attachments(Value::String("text".into()), attachments.clone());
572 let obj = merged.as_object().expect("should be object");
573 assert_eq!(obj.get("input"), Some(&Value::String("text".into())));
574 assert_eq!(obj.get("_attachments"), Some(&Value::Object(attachments)));
575 }
576
577 #[test]
578 fn attachment_kind_secret_requires_type_lock() {
579 let metadata = json!({
580 "type": "secret",
581 "key": "TOKEN"
582 });
583 if let Some(AttachmentKind::Secret { key, alias }) =
584 AttachmentKind::from_metadata(Some(&metadata))
585 {
586 assert_eq!(key, "TOKEN");
587 assert_eq!(alias, "TOKEN");
588 } else {
589 panic!("expected secret attachment");
590 }
591 }
592
593 #[test]
594 fn attachment_kind_secret_with_alias() {
595 let metadata = json!({
596 "type": "secret",
597 "key": "TOKEN",
598 "alias": "api_token"
599 });
600 if let Some(AttachmentKind::Secret { key, alias }) =
601 AttachmentKind::from_metadata(Some(&metadata))
602 {
603 assert_eq!(key, "TOKEN");
604 assert_eq!(alias, "api_token");
605 } else {
606 panic!("expected secret attachment");
607 }
608 }
609}