Skip to main content

alien_bindings/providers/worker/
kubernetes.rs

1use crate::{
2    error::{ErrorData, Result},
3    traits::{Binding, Worker, WorkerInvokeRequest, WorkerInvokeResponse},
4};
5use alien_core::bindings::KubernetesWorkerBinding;
6use alien_error::{Context, IntoAlienError};
7use async_trait::async_trait;
8use std::collections::BTreeMap;
9
10/// Kubernetes Worker implementation that calls workers via internal Kubernetes Services
11#[derive(Debug)]
12pub struct KubernetesWorker {
13    namespace: String,
14    service_name: String,
15    service_port: u16,
16    public_url: Option<String>,
17    http_client: reqwest::Client,
18}
19
20impl KubernetesWorker {
21    pub fn new(binding_name: String, binding: KubernetesWorkerBinding) -> Result<Self> {
22        let namespace = binding
23            .namespace
24            .into_value(&binding_name, "namespace")
25            .context(ErrorData::BindingConfigInvalid {
26                binding_name: binding_name.clone(),
27                reason: "Failed to extract namespace from Kubernetes worker binding".to_string(),
28            })?;
29
30        let service_name = binding
31            .service_name
32            .into_value(&binding_name, "service_name")
33            .context(ErrorData::BindingConfigInvalid {
34                binding_name: binding_name.clone(),
35                reason: "Failed to extract service_name from Kubernetes worker binding".to_string(),
36            })?;
37
38        let service_port = binding
39            .service_port
40            .into_value(&binding_name, "service_port")
41            .context(ErrorData::BindingConfigInvalid {
42                binding_name: binding_name.clone(),
43                reason: "Failed to extract service_port from Kubernetes worker binding".to_string(),
44            })?;
45
46        let public_url = binding
47            .public_url
48            .map(|v| v.into_value(&binding_name, "public_url"))
49            .transpose()
50            .context(ErrorData::BindingConfigInvalid {
51                binding_name: binding_name.clone(),
52                reason: "Failed to extract public_url from Kubernetes worker binding".to_string(),
53            })?;
54
55        Ok(Self {
56            namespace,
57            service_name,
58            service_port,
59            public_url,
60            http_client: reqwest::Client::new(),
61        })
62    }
63
64    /// Constructs the internal service URL for the worker
65    fn get_internal_service_url(&self) -> String {
66        format!(
67            "http://{}.{}.svc.cluster.local:{}",
68            self.service_name, self.namespace, self.service_port
69        )
70    }
71}
72
73#[async_trait]
74impl Binding for KubernetesWorker {}
75
76#[async_trait]
77impl Worker for KubernetesWorker {
78    async fn invoke(&self, request: WorkerInvokeRequest) -> Result<WorkerInvokeResponse> {
79        // Construct the full URL
80        let base_url = self.get_internal_service_url();
81        let url = format!("{}{}", base_url, request.path);
82
83        // Build the HTTP request
84        let mut req_builder = self
85            .http_client
86            .request(
87                reqwest::Method::from_bytes(request.method.as_bytes())
88                    .into_alien_error()
89                    .context(ErrorData::CloudPlatformError {
90                        message: format!(
91                            "Failed to invoke Kubernetes worker '{}': Invalid HTTP method",
92                            self.service_name
93                        ),
94                        resource_id: Some(self.service_name.clone()),
95                    })?,
96                &url,
97            )
98            .body(request.body);
99
100        // Add headers
101        for (key, value) in request.headers {
102            req_builder = req_builder.header(&key, &value);
103        }
104
105        // Set timeout if specified
106        if let Some(timeout) = request.timeout {
107            req_builder = req_builder.timeout(timeout);
108        }
109
110        // Execute the request
111        let response =
112            req_builder
113                .send()
114                .await
115                .into_alien_error()
116                .context(ErrorData::CloudPlatformError {
117                    message: format!(
118                        "Failed to invoke Kubernetes worker '{}': HTTP request failed",
119                        self.service_name
120                    ),
121                    resource_id: Some(self.service_name.clone()),
122                })?;
123
124        // Extract response data
125        let status = response.status().as_u16();
126        let headers: BTreeMap<String, String> = response
127            .headers()
128            .iter()
129            .filter_map(|(k, v)| {
130                v.to_str()
131                    .ok()
132                    .map(|v| (k.as_str().to_string(), v.to_string()))
133            })
134            .collect();
135
136        let body = response
137            .bytes()
138            .await
139            .into_alien_error()
140            .context(ErrorData::CloudPlatformError {
141                message: format!(
142                    "Failed to invoke Kubernetes worker '{}': Failed to read response body",
143                    self.service_name
144                ),
145                resource_id: Some(self.service_name.clone()),
146            })?
147            .to_vec();
148
149        Ok(WorkerInvokeResponse {
150            status,
151            headers,
152            body,
153        })
154    }
155
156    async fn get_worker_url(&self) -> Result<Option<String>> {
157        // Return the public URL if configured in the binding
158        Ok(self.public_url.clone())
159    }
160
161    fn as_any(&self) -> &dyn std::any::Any {
162        self
163    }
164}