Skip to main content

alien_bindings/providers/
grpc_provider.rs

1use crate::{
2    error::{Error, ErrorData},
3    providers::{
4        artifact_registry::grpc::GrpcArtifactRegistry, build::grpc::GrpcBuild,
5        function::grpc::GrpcFunction, kv::grpc::GrpcKv, queue::grpc::GrpcQueue,
6        service_account::grpc::GrpcServiceAccount, storage::grpc::GrpcStorage,
7        vault::grpc::GrpcVault,
8    },
9    traits::{
10        ArtifactRegistry, BindingsProviderApi, Build, Container, Function, Kv, Queue,
11        ServiceAccount, Storage, Vault,
12    },
13};
14use alien_error::{AlienError, Context, IntoAlienError};
15use async_trait::async_trait;
16use std::{collections::HashMap, sync::Arc};
17use tonic::transport::Channel;
18
19/// Creates a gRPC channel from an address string, handling URI scheme normalization
20/// and connection establishment with proper error handling.
21pub async fn create_grpc_channel(grpc_address: String) -> crate::error::Result<Channel> {
22    tracing::debug!(
23        "create_grpc_channel: Connecting to gRPC server at: {}",
24        grpc_address
25    );
26
27    // Ensure the address has a scheme, default to http if not present
28    let endpoint_uri = if grpc_address.contains("://") {
29        grpc_address.clone()
30    } else {
31        format!("http://{}", grpc_address)
32    };
33
34    tracing::debug!("create_grpc_channel: Endpoint URI: {}", endpoint_uri);
35
36    let endpoint = Channel::from_shared(endpoint_uri.clone())
37        .into_alien_error()
38        .context(ErrorData::GrpcConnectionFailed {
39            endpoint: endpoint_uri.clone(),
40            reason: "Invalid gRPC endpoint URI format".to_string(),
41        })?
42        .timeout(std::time::Duration::from_secs(30)) // Request timeout
43        .connect_timeout(std::time::Duration::from_secs(5)) // Connection establishment timeout
44        .http2_keep_alive_interval(std::time::Duration::from_secs(30)) // Send keep-alive pings
45        .keep_alive_timeout(std::time::Duration::from_secs(10)) // Keep-alive response timeout
46        .keep_alive_while_idle(true); // Keep connection alive even when idle
47
48    tracing::debug!("create_grpc_channel: Attempting to connect to endpoint");
49    let channel =
50        endpoint
51            .connect()
52            .await
53            .into_alien_error()
54            .context(ErrorData::GrpcConnectionFailed {
55                endpoint: grpc_address.clone(),
56                reason: "Failed to establish gRPC connection".to_string(),
57            })?;
58
59    tracing::debug!(
60        "create_grpc_channel: Successfully connected to {}",
61        grpc_address
62    );
63    Ok(channel)
64}
65
66/// gRPC implementation of the `BindingsProvider` trait.
67///
68/// This provider connects to a gRPC endpoint specified by the
69/// `ALIEN_BINDINGS_GRPC_ADDRESS` environment variable.
70///
71/// Uses a shared gRPC channel to avoid file descriptor exhaustion.
72#[derive(Debug, Clone)]
73pub struct GrpcBindingsProvider {
74    env: HashMap<String, String>,
75    /// Shared gRPC channel, created lazily on first use
76    channel: Arc<tokio::sync::OnceCell<Channel>>,
77}
78
79impl GrpcBindingsProvider {
80    /// Creates a new provider that reads environment variables from `std::env`.
81    /// This is disabled on wasm32 targets.
82    #[cfg(not(target_arch = "wasm32"))]
83    pub fn new() -> Result<Self, Error> {
84        Self::new_with_env(std::env::vars().collect())
85    }
86
87    /// Creates a new provider with an empty environment map.
88    /// This is the default constructor on wasm32 targets.
89    #[cfg(target_arch = "wasm32")]
90    pub fn new() -> Result<Self, Error> {
91        Self::new_with_env(HashMap::new())
92    }
93
94    /// Creates a new provider that reads environment variables from the provided map.
95    pub fn new_with_env(env: HashMap<String, String>) -> Result<Self, Error> {
96        Ok(Self {
97            env,
98            channel: Arc::new(tokio::sync::OnceCell::new()),
99        })
100    }
101
102    // Helper function to get environment variable, returning crate::Error
103    fn get_env_var(&self, var_name: &str) -> Result<String, Error> {
104        self.env.get(var_name).cloned().ok_or_else(|| {
105            AlienError::new(ErrorData::EnvironmentVariableMissing {
106                variable_name: var_name.to_string(),
107            })
108        })
109    }
110
111    // Helper function to get gRPC address for any binding
112    fn get_grpc_address(&self) -> Result<String, Error> {
113        self.get_env_var("ALIEN_BINDINGS_GRPC_ADDRESS")
114    }
115
116    /// Get or create the shared gRPC channel.
117    /// This ensures we only have ONE channel per GrpcBindingsProvider instance,
118    /// preventing file descriptor exhaustion.
119    async fn get_channel(&self) -> Result<Channel, Error> {
120        let channel = self
121            .channel
122            .get_or_try_init(|| async {
123                let grpc_address = self.get_grpc_address()?;
124                tracing::debug!(
125                    "GrpcBindingsProvider: Creating shared gRPC channel to {}",
126                    grpc_address
127                );
128                create_grpc_channel(grpc_address).await
129            })
130            .await?;
131
132        Ok(channel.clone())
133    }
134
135    /// Public method to get the shared channel for use by other components (e.g., WaitUntilContext).
136    /// This enables the entire AlienContext to use a single gRPC channel.
137    pub async fn get_shared_channel(&self) -> Result<Channel, Error> {
138        self.get_channel().await
139    }
140}
141
142#[async_trait]
143impl BindingsProviderApi for GrpcBindingsProvider {
144    async fn load_storage(&self, binding_name: &str) -> Result<Arc<dyn Storage>, Error> {
145        tracing::debug!(
146            "GrpcBindingsProvider::load_storage: Loading storage binding: {}",
147            binding_name
148        );
149
150        let channel = self
151            .get_channel()
152            .await
153            .context(ErrorData::BindingConfigInvalid {
154                binding_name: binding_name.to_string(),
155                reason: "Failed to get shared gRPC channel".to_string(),
156            })?;
157
158        let storage = Arc::new(
159            GrpcStorage::new_from_channel(channel, binding_name.to_string())
160                .await
161                .context(ErrorData::BindingConfigInvalid {
162                    binding_name: binding_name.to_string(),
163                    reason: "Failed to initialize gRPC storage".to_string(),
164                })?,
165        );
166
167        tracing::debug!(
168            "GrpcBindingsProvider::load_storage: Successfully loaded storage binding: {}",
169            binding_name
170        );
171        Ok(storage)
172    }
173
174    async fn load_build(&self, binding_name: &str) -> Result<Arc<dyn Build>, Error> {
175        let channel = self
176            .get_channel()
177            .await
178            .context(ErrorData::BindingConfigInvalid {
179                binding_name: binding_name.to_string(),
180                reason: "Failed to get shared gRPC channel".to_string(),
181            })?;
182
183        let build = Arc::new(
184            GrpcBuild::new_from_channel(channel, binding_name.to_string())
185                .await
186                .context(ErrorData::BindingConfigInvalid {
187                    binding_name: binding_name.to_string(),
188                    reason: "Failed to initialize gRPC build".to_string(),
189                })?,
190        );
191
192        Ok(build)
193    }
194
195    async fn load_artifact_registry(
196        &self,
197        binding_name: &str,
198    ) -> Result<Arc<dyn ArtifactRegistry>, Error> {
199        let channel = self
200            .get_channel()
201            .await
202            .context(ErrorData::BindingConfigInvalid {
203                binding_name: binding_name.to_string(),
204                reason: "Failed to get shared gRPC channel".to_string(),
205            })?;
206
207        let artifact_registry = Arc::new(
208            GrpcArtifactRegistry::new_from_channel(channel, binding_name.to_string())
209                .await
210                .context(ErrorData::BindingConfigInvalid {
211                    binding_name: binding_name.to_string(),
212                    reason: "Failed to initialize gRPC artifact registry".to_string(),
213                })?,
214        );
215
216        Ok(artifact_registry)
217    }
218
219    async fn load_vault(&self, binding_name: &str) -> Result<Arc<dyn Vault>, Error> {
220        let channel = self
221            .get_channel()
222            .await
223            .context(ErrorData::BindingConfigInvalid {
224                binding_name: binding_name.to_string(),
225                reason: "Failed to get shared gRPC channel".to_string(),
226            })?;
227
228        let vault = Arc::new(
229            GrpcVault::new_from_channel(channel, binding_name.to_string())
230                .await
231                .context(ErrorData::BindingConfigInvalid {
232                    binding_name: binding_name.to_string(),
233                    reason: "Failed to initialize gRPC vault".to_string(),
234                })?,
235        );
236
237        Ok(vault)
238    }
239
240    async fn load_kv(&self, binding_name: &str) -> Result<Arc<dyn Kv>, Error> {
241        let channel = self
242            .get_channel()
243            .await
244            .context(ErrorData::BindingConfigInvalid {
245                binding_name: binding_name.to_string(),
246                reason: "Failed to get shared gRPC channel".to_string(),
247            })?;
248
249        let kv = Arc::new(
250            GrpcKv::new_from_channel(channel, binding_name.to_string())
251                .await
252                .context(ErrorData::BindingConfigInvalid {
253                    binding_name: binding_name.to_string(),
254                    reason: "Failed to initialize gRPC KV".to_string(),
255                })?,
256        );
257
258        Ok(kv)
259    }
260
261    async fn load_queue(&self, binding_name: &str) -> Result<Arc<dyn Queue>, Error> {
262        let channel = self
263            .get_channel()
264            .await
265            .context(ErrorData::BindingConfigInvalid {
266                binding_name: binding_name.to_string(),
267                reason: "Failed to get shared gRPC channel".to_string(),
268            })?;
269
270        let queue = Arc::new(
271            GrpcQueue::new_from_channel(channel, binding_name.to_string())
272                .await
273                .context(ErrorData::BindingConfigInvalid {
274                    binding_name: binding_name.to_string(),
275                    reason: "Failed to initialize gRPC Queue".to_string(),
276                })?,
277        );
278
279        Ok(queue)
280    }
281
282    async fn load_function(&self, binding_name: &str) -> Result<Arc<dyn Function>, Error> {
283        let channel = self
284            .get_channel()
285            .await
286            .context(ErrorData::BindingConfigInvalid {
287                binding_name: binding_name.to_string(),
288                reason: "Failed to get shared gRPC channel".to_string(),
289            })?;
290
291        let function = Arc::new(
292            GrpcFunction::new_from_channel(channel, binding_name.to_string())
293                .await
294                .context(ErrorData::BindingConfigInvalid {
295                    binding_name: binding_name.to_string(),
296                    reason: "Failed to initialize gRPC Function".to_string(),
297                })?,
298        );
299
300        Ok(function)
301    }
302
303    async fn load_container(&self, binding_name: &str) -> Result<Arc<dyn Container>, Error> {
304        use crate::providers::container::GrpcContainer;
305
306        let channel = self
307            .get_channel()
308            .await
309            .context(ErrorData::BindingConfigInvalid {
310                binding_name: binding_name.to_string(),
311                reason: "Failed to get shared gRPC channel".to_string(),
312            })?;
313
314        let container = Arc::new(
315            GrpcContainer::new_from_channel(channel, binding_name.to_string())
316                .await
317                .context(ErrorData::BindingConfigInvalid {
318                    binding_name: binding_name.to_string(),
319                    reason: "Failed to initialize gRPC Container".to_string(),
320                })?,
321        );
322
323        Ok(container)
324    }
325
326    async fn load_service_account(
327        &self,
328        binding_name: &str,
329    ) -> Result<Arc<dyn ServiceAccount>, Error> {
330        let channel = self
331            .get_channel()
332            .await
333            .context(ErrorData::BindingConfigInvalid {
334                binding_name: binding_name.to_string(),
335                reason: "Failed to get shared gRPC channel".to_string(),
336            })?;
337
338        let service_account = Arc::new(
339            GrpcServiceAccount::new_from_channel(channel, binding_name.to_string())
340                .await
341                .context(ErrorData::BindingConfigInvalid {
342                    binding_name: binding_name.to_string(),
343                    reason: "Failed to initialize gRPC service account".to_string(),
344                })?,
345        );
346
347        Ok(service_account)
348    }
349}