alien_bindings/providers/worker/
kubernetes.rs1use crate::{
2 error::{ErrorData, Result},
3 traits::{Binding, Worker, WorkerInvokeRequest, WorkerInvokeResponse},
4};
5use alien_core::bindings::KubernetesWorkerBinding;
6use alien_error::{Context, IntoAlienError};
7use async_trait::async_trait;
8use std::collections::BTreeMap;
9
10#[derive(Debug)]
12pub struct KubernetesWorker {
13 namespace: String,
14 service_name: String,
15 service_port: u16,
16 public_url: Option<String>,
17 http_client: reqwest::Client,
18}
19
20impl KubernetesWorker {
21 pub fn new(binding_name: String, binding: KubernetesWorkerBinding) -> Result<Self> {
22 let namespace = binding
23 .namespace
24 .into_value(&binding_name, "namespace")
25 .context(ErrorData::BindingConfigInvalid {
26 binding_name: binding_name.clone(),
27 reason: "Failed to extract namespace from Kubernetes worker binding".to_string(),
28 })?;
29
30 let service_name = binding
31 .service_name
32 .into_value(&binding_name, "service_name")
33 .context(ErrorData::BindingConfigInvalid {
34 binding_name: binding_name.clone(),
35 reason: "Failed to extract service_name from Kubernetes worker binding".to_string(),
36 })?;
37
38 let service_port = binding
39 .service_port
40 .into_value(&binding_name, "service_port")
41 .context(ErrorData::BindingConfigInvalid {
42 binding_name: binding_name.clone(),
43 reason: "Failed to extract service_port from Kubernetes worker binding".to_string(),
44 })?;
45
46 let public_url = binding
47 .public_url
48 .map(|v| v.into_value(&binding_name, "public_url"))
49 .transpose()
50 .context(ErrorData::BindingConfigInvalid {
51 binding_name: binding_name.clone(),
52 reason: "Failed to extract public_url from Kubernetes worker binding".to_string(),
53 })?;
54
55 Ok(Self {
56 namespace,
57 service_name,
58 service_port,
59 public_url,
60 http_client: reqwest::Client::new(),
61 })
62 }
63
64 fn get_internal_service_url(&self) -> String {
66 format!(
67 "http://{}.{}.svc.cluster.local:{}",
68 self.service_name, self.namespace, self.service_port
69 )
70 }
71}
72
73#[async_trait]
74impl Binding for KubernetesWorker {}
75
76#[async_trait]
77impl Worker for KubernetesWorker {
78 async fn invoke(&self, request: WorkerInvokeRequest) -> Result<WorkerInvokeResponse> {
79 let base_url = self.get_internal_service_url();
81 let url = format!("{}{}", base_url, request.path);
82
83 let mut req_builder = self
85 .http_client
86 .request(
87 reqwest::Method::from_bytes(request.method.as_bytes())
88 .into_alien_error()
89 .context(ErrorData::CloudPlatformError {
90 message: format!(
91 "Failed to invoke Kubernetes worker '{}': Invalid HTTP method",
92 self.service_name
93 ),
94 resource_id: Some(self.service_name.clone()),
95 })?,
96 &url,
97 )
98 .body(request.body);
99
100 for (key, value) in request.headers {
102 req_builder = req_builder.header(&key, &value);
103 }
104
105 if let Some(timeout) = request.timeout {
107 req_builder = req_builder.timeout(timeout);
108 }
109
110 let response =
112 req_builder
113 .send()
114 .await
115 .into_alien_error()
116 .context(ErrorData::CloudPlatformError {
117 message: format!(
118 "Failed to invoke Kubernetes worker '{}': HTTP request failed",
119 self.service_name
120 ),
121 resource_id: Some(self.service_name.clone()),
122 })?;
123
124 let status = response.status().as_u16();
126 let headers: BTreeMap<String, String> = response
127 .headers()
128 .iter()
129 .filter_map(|(k, v)| {
130 v.to_str()
131 .ok()
132 .map(|v| (k.as_str().to_string(), v.to_string()))
133 })
134 .collect();
135
136 let body = response
137 .bytes()
138 .await
139 .into_alien_error()
140 .context(ErrorData::CloudPlatformError {
141 message: format!(
142 "Failed to invoke Kubernetes worker '{}': Failed to read response body",
143 self.service_name
144 ),
145 resource_id: Some(self.service_name.clone()),
146 })?
147 .to_vec();
148
149 Ok(WorkerInvokeResponse {
150 status,
151 headers,
152 body,
153 })
154 }
155
156 async fn get_worker_url(&self) -> Result<Option<String>> {
157 Ok(self.public_url.clone())
159 }
160
161 fn as_any(&self) -> &dyn std::any::Any {
162 self
163 }
164}