Skip to main content

alien_bindings/providers/function/
kubernetes.rs

1use crate::{
2    error::{ErrorData, Result},
3    traits::{Binding, Function, FunctionInvokeRequest, FunctionInvokeResponse},
4};
5use alien_core::bindings::KubernetesFunctionBinding;
6use alien_error::{Context, IntoAlienError};
7use async_trait::async_trait;
8use std::collections::BTreeMap;
9
10/// Kubernetes Function implementation that calls functions via internal Kubernetes Services
11#[derive(Debug)]
12pub struct KubernetesFunction {
13    binding_name: String,
14    namespace: String,
15    service_name: String,
16    service_port: u16,
17    public_url: Option<String>,
18    http_client: reqwest::Client,
19}
20
21impl KubernetesFunction {
22    pub fn new(binding_name: String, binding: KubernetesFunctionBinding) -> Result<Self> {
23        let namespace = binding
24            .namespace
25            .into_value(&binding_name, "namespace")
26            .context(ErrorData::BindingConfigInvalid {
27                binding_name: binding_name.clone(),
28                reason: "Failed to extract namespace from Kubernetes function binding".to_string(),
29            })?;
30
31        let service_name = binding
32            .service_name
33            .into_value(&binding_name, "service_name")
34            .context(ErrorData::BindingConfigInvalid {
35                binding_name: binding_name.clone(),
36                reason: "Failed to extract service_name from Kubernetes function binding"
37                    .to_string(),
38            })?;
39
40        let service_port = binding
41            .service_port
42            .into_value(&binding_name, "service_port")
43            .context(ErrorData::BindingConfigInvalid {
44                binding_name: binding_name.clone(),
45                reason: "Failed to extract service_port from Kubernetes function binding"
46                    .to_string(),
47            })?;
48
49        let public_url = binding
50            .public_url
51            .map(|v| v.into_value(&binding_name, "public_url"))
52            .transpose()
53            .context(ErrorData::BindingConfigInvalid {
54                binding_name: binding_name.clone(),
55                reason: "Failed to extract public_url from Kubernetes function binding".to_string(),
56            })?;
57
58        Ok(Self {
59            binding_name,
60            namespace,
61            service_name,
62            service_port,
63            public_url,
64            http_client: reqwest::Client::new(),
65        })
66    }
67
68    /// Constructs the internal service URL for the function
69    fn get_internal_service_url(&self) -> String {
70        format!(
71            "http://{}.{}.svc.cluster.local:{}",
72            self.service_name, self.namespace, self.service_port
73        )
74    }
75}
76
77#[async_trait]
78impl Binding for KubernetesFunction {}
79
80#[async_trait]
81impl Function for KubernetesFunction {
82    async fn invoke(&self, request: FunctionInvokeRequest) -> Result<FunctionInvokeResponse> {
83        // Construct the full URL
84        let base_url = self.get_internal_service_url();
85        let url = format!("{}{}", base_url, request.path);
86
87        // Build the HTTP request
88        let mut req_builder = self
89            .http_client
90            .request(
91                reqwest::Method::from_bytes(request.method.as_bytes())
92                    .into_alien_error()
93                    .context(ErrorData::CloudPlatformError {
94                        message: format!(
95                            "Failed to invoke Kubernetes function '{}': Invalid HTTP method",
96                            self.service_name
97                        ),
98                        resource_id: Some(self.service_name.clone()),
99                    })?,
100                &url,
101            )
102            .body(request.body);
103
104        // Add headers
105        for (key, value) in request.headers {
106            req_builder = req_builder.header(&key, &value);
107        }
108
109        // Set timeout if specified
110        if let Some(timeout) = request.timeout {
111            req_builder = req_builder.timeout(timeout);
112        }
113
114        // Execute the request
115        let response =
116            req_builder
117                .send()
118                .await
119                .into_alien_error()
120                .context(ErrorData::CloudPlatformError {
121                    message: format!(
122                        "Failed to invoke Kubernetes function '{}': HTTP request failed",
123                        self.service_name
124                    ),
125                    resource_id: Some(self.service_name.clone()),
126                })?;
127
128        // Extract response data
129        let status = response.status().as_u16();
130        let headers: BTreeMap<String, String> = response
131            .headers()
132            .iter()
133            .filter_map(|(k, v)| {
134                v.to_str()
135                    .ok()
136                    .map(|v| (k.as_str().to_string(), v.to_string()))
137            })
138            .collect();
139
140        let body = response
141            .bytes()
142            .await
143            .into_alien_error()
144            .context(ErrorData::CloudPlatformError {
145                message: format!(
146                    "Failed to invoke Kubernetes function '{}': Failed to read response body",
147                    self.service_name
148                ),
149                resource_id: Some(self.service_name.clone()),
150            })?
151            .to_vec();
152
153        Ok(FunctionInvokeResponse {
154            status,
155            headers,
156            body,
157        })
158    }
159
160    async fn get_function_url(&self) -> Result<Option<String>> {
161        // Return the public URL if configured in the binding
162        Ok(self.public_url.clone())
163    }
164
165    fn as_any(&self) -> &dyn std::any::Any {
166        self
167    }
168}