Skip to main content

alien_bindings/providers/
grpc_provider.rs

1use crate::{
2    error::{Error, ErrorData},
3    providers::{
4        artifact_registry::grpc::GrpcArtifactRegistry, build::grpc::GrpcBuild, kv::grpc::GrpcKv,
5        queue::grpc::GrpcQueue, service_account::grpc::GrpcServiceAccount,
6        storage::grpc::GrpcStorage, vault::grpc::GrpcVault, worker::grpc::GrpcWorker,
7    },
8    traits::{
9        ArtifactRegistry, BindingsProviderApi, Build, Container, Kv, Queue, ServiceAccount,
10        Storage, Vault, Worker,
11    },
12};
13use alien_error::{AlienError, Context, IntoAlienError};
14use async_trait::async_trait;
15use std::{collections::HashMap, sync::Arc};
16use tonic::transport::Channel;
17
18/// Creates a gRPC channel from an address string, handling URI scheme normalization
19/// and connection establishment with proper error handling.
20pub async fn create_grpc_channel(grpc_address: String) -> crate::error::Result<Channel> {
21    tracing::debug!(
22        "create_grpc_channel: Connecting to gRPC server at: {}",
23        grpc_address
24    );
25
26    // Ensure the address has a scheme, default to http if not present
27    let endpoint_uri = if grpc_address.contains("://") {
28        grpc_address.clone()
29    } else {
30        format!("http://{}", grpc_address)
31    };
32
33    tracing::debug!("create_grpc_channel: Endpoint URI: {}", endpoint_uri);
34
35    let endpoint = Channel::from_shared(endpoint_uri.clone())
36        .into_alien_error()
37        .context(ErrorData::GrpcConnectionFailed {
38            endpoint: endpoint_uri.clone(),
39            reason: "Invalid gRPC endpoint URI format".to_string(),
40        })?
41        .timeout(std::time::Duration::from_secs(30)) // Request timeout
42        .connect_timeout(std::time::Duration::from_secs(5)) // Connection establishment timeout
43        .http2_keep_alive_interval(std::time::Duration::from_secs(30)) // Send keep-alive pings
44        .keep_alive_timeout(std::time::Duration::from_secs(10)) // Keep-alive response timeout
45        .keep_alive_while_idle(true); // Keep connection alive even when idle
46
47    tracing::debug!("create_grpc_channel: Attempting to connect to endpoint");
48    let channel =
49        endpoint
50            .connect()
51            .await
52            .into_alien_error()
53            .context(ErrorData::GrpcConnectionFailed {
54                endpoint: grpc_address.clone(),
55                reason: "Failed to establish gRPC connection".to_string(),
56            })?;
57
58    tracing::debug!(
59        "create_grpc_channel: Successfully connected to {}",
60        grpc_address
61    );
62    Ok(channel)
63}
64
65/// gRPC implementation of the `BindingsProvider` trait.
66///
67/// This provider connects to a gRPC endpoint specified by the
68/// `ALIEN_BINDINGS_GRPC_ADDRESS` environment variable.
69///
70/// Uses a shared gRPC channel to avoid file descriptor exhaustion.
71#[derive(Debug, Clone)]
72pub struct GrpcBindingsProvider {
73    env: HashMap<String, String>,
74    /// Shared gRPC channel, created lazily on first use
75    channel: Arc<tokio::sync::OnceCell<Channel>>,
76}
77
78impl GrpcBindingsProvider {
79    /// Creates a new provider that reads environment variables from `std::env`.
80    /// This is disabled on wasm32 targets.
81    #[cfg(not(target_arch = "wasm32"))]
82    pub fn new() -> Result<Self, Error> {
83        Self::new_with_env(std::env::vars().collect())
84    }
85
86    /// Creates a new provider with an empty environment map.
87    /// This is the default constructor on wasm32 targets.
88    #[cfg(target_arch = "wasm32")]
89    pub fn new() -> Result<Self, Error> {
90        Self::new_with_env(HashMap::new())
91    }
92
93    /// Creates a new provider that reads environment variables from the provided map.
94    pub fn new_with_env(env: HashMap<String, String>) -> Result<Self, Error> {
95        Ok(Self {
96            env,
97            channel: Arc::new(tokio::sync::OnceCell::new()),
98        })
99    }
100
101    // Helper function to get environment variable, returning crate::Error
102    fn get_env_var(&self, var_name: &str) -> Result<String, Error> {
103        self.env.get(var_name).cloned().ok_or_else(|| {
104            AlienError::new(ErrorData::EnvironmentVariableMissing {
105                variable_name: var_name.to_string(),
106            })
107        })
108    }
109
110    // Helper function to get gRPC address for any binding
111    fn get_grpc_address(&self) -> Result<String, Error> {
112        self.get_env_var("ALIEN_BINDINGS_GRPC_ADDRESS")
113    }
114
115    /// Get or create the shared gRPC channel.
116    /// This ensures we only have ONE channel per GrpcBindingsProvider instance,
117    /// preventing file descriptor exhaustion.
118    async fn get_channel(&self) -> Result<Channel, Error> {
119        let channel = self
120            .channel
121            .get_or_try_init(|| async {
122                let grpc_address = self.get_grpc_address()?;
123                tracing::debug!(
124                    "GrpcBindingsProvider: Creating shared gRPC channel to {}",
125                    grpc_address
126                );
127                create_grpc_channel(grpc_address).await
128            })
129            .await?;
130
131        Ok(channel.clone())
132    }
133
134    /// Public method to get the shared channel for use by other components (e.g., WaitUntilContext).
135    /// This enables the entire AlienContext to use a single gRPC channel.
136    pub async fn get_shared_channel(&self) -> Result<Channel, Error> {
137        self.get_channel().await
138    }
139}
140
141#[async_trait]
142impl BindingsProviderApi for GrpcBindingsProvider {
143    async fn load_storage(&self, binding_name: &str) -> Result<Arc<dyn Storage>, Error> {
144        tracing::debug!(
145            "GrpcBindingsProvider::load_storage: Loading storage binding: {}",
146            binding_name
147        );
148
149        let channel = self
150            .get_channel()
151            .await
152            .context(ErrorData::BindingConfigInvalid {
153                binding_name: binding_name.to_string(),
154                reason: "Failed to get shared gRPC channel".to_string(),
155            })?;
156
157        let storage = Arc::new(
158            GrpcStorage::new_from_channel(channel, binding_name.to_string())
159                .await
160                .context(ErrorData::BindingConfigInvalid {
161                    binding_name: binding_name.to_string(),
162                    reason: "Failed to initialize gRPC storage".to_string(),
163                })?,
164        );
165
166        tracing::debug!(
167            "GrpcBindingsProvider::load_storage: Successfully loaded storage binding: {}",
168            binding_name
169        );
170        Ok(storage)
171    }
172
173    async fn load_build(&self, binding_name: &str) -> Result<Arc<dyn Build>, Error> {
174        let channel = self
175            .get_channel()
176            .await
177            .context(ErrorData::BindingConfigInvalid {
178                binding_name: binding_name.to_string(),
179                reason: "Failed to get shared gRPC channel".to_string(),
180            })?;
181
182        let build = Arc::new(
183            GrpcBuild::new_from_channel(channel, binding_name.to_string())
184                .await
185                .context(ErrorData::BindingConfigInvalid {
186                    binding_name: binding_name.to_string(),
187                    reason: "Failed to initialize gRPC build".to_string(),
188                })?,
189        );
190
191        Ok(build)
192    }
193
194    async fn load_artifact_registry(
195        &self,
196        binding_name: &str,
197    ) -> Result<Arc<dyn ArtifactRegistry>, Error> {
198        let channel = self
199            .get_channel()
200            .await
201            .context(ErrorData::BindingConfigInvalid {
202                binding_name: binding_name.to_string(),
203                reason: "Failed to get shared gRPC channel".to_string(),
204            })?;
205
206        let artifact_registry = Arc::new(
207            GrpcArtifactRegistry::new_from_channel(channel, binding_name.to_string())
208                .await
209                .context(ErrorData::BindingConfigInvalid {
210                    binding_name: binding_name.to_string(),
211                    reason: "Failed to initialize gRPC artifact registry".to_string(),
212                })?,
213        );
214
215        Ok(artifact_registry)
216    }
217
218    async fn load_vault(&self, binding_name: &str) -> Result<Arc<dyn Vault>, Error> {
219        let channel = self
220            .get_channel()
221            .await
222            .context(ErrorData::BindingConfigInvalid {
223                binding_name: binding_name.to_string(),
224                reason: "Failed to get shared gRPC channel".to_string(),
225            })?;
226
227        let vault = Arc::new(
228            GrpcVault::new_from_channel(channel, binding_name.to_string())
229                .await
230                .context(ErrorData::BindingConfigInvalid {
231                    binding_name: binding_name.to_string(),
232                    reason: "Failed to initialize gRPC vault".to_string(),
233                })?,
234        );
235
236        Ok(vault)
237    }
238
239    async fn load_kv(&self, binding_name: &str) -> Result<Arc<dyn Kv>, Error> {
240        let channel = self
241            .get_channel()
242            .await
243            .context(ErrorData::BindingConfigInvalid {
244                binding_name: binding_name.to_string(),
245                reason: "Failed to get shared gRPC channel".to_string(),
246            })?;
247
248        let kv = Arc::new(
249            GrpcKv::new_from_channel(channel, binding_name.to_string())
250                .await
251                .context(ErrorData::BindingConfigInvalid {
252                    binding_name: binding_name.to_string(),
253                    reason: "Failed to initialize gRPC KV".to_string(),
254                })?,
255        );
256
257        Ok(kv)
258    }
259
260    async fn load_queue(&self, binding_name: &str) -> Result<Arc<dyn Queue>, Error> {
261        let channel = self
262            .get_channel()
263            .await
264            .context(ErrorData::BindingConfigInvalid {
265                binding_name: binding_name.to_string(),
266                reason: "Failed to get shared gRPC channel".to_string(),
267            })?;
268
269        let queue = Arc::new(
270            GrpcQueue::new_from_channel(channel, binding_name.to_string())
271                .await
272                .context(ErrorData::BindingConfigInvalid {
273                    binding_name: binding_name.to_string(),
274                    reason: "Failed to initialize gRPC Queue".to_string(),
275                })?,
276        );
277
278        Ok(queue)
279    }
280
281    async fn load_worker(&self, binding_name: &str) -> Result<Arc<dyn Worker>, Error> {
282        let channel = self
283            .get_channel()
284            .await
285            .context(ErrorData::BindingConfigInvalid {
286                binding_name: binding_name.to_string(),
287                reason: "Failed to get shared gRPC channel".to_string(),
288            })?;
289
290        let function = Arc::new(
291            GrpcWorker::new_from_channel(channel, binding_name.to_string())
292                .await
293                .context(ErrorData::BindingConfigInvalid {
294                    binding_name: binding_name.to_string(),
295                    reason: "Failed to initialize gRPC Function".to_string(),
296                })?,
297        );
298
299        Ok(function)
300    }
301
302    async fn load_container(&self, binding_name: &str) -> Result<Arc<dyn Container>, Error> {
303        use crate::providers::container::GrpcContainer;
304
305        let channel = self
306            .get_channel()
307            .await
308            .context(ErrorData::BindingConfigInvalid {
309                binding_name: binding_name.to_string(),
310                reason: "Failed to get shared gRPC channel".to_string(),
311            })?;
312
313        let container = Arc::new(
314            GrpcContainer::new_from_channel(channel, binding_name.to_string())
315                .await
316                .context(ErrorData::BindingConfigInvalid {
317                    binding_name: binding_name.to_string(),
318                    reason: "Failed to initialize gRPC Container".to_string(),
319                })?,
320        );
321
322        Ok(container)
323    }
324
325    async fn load_service_account(
326        &self,
327        binding_name: &str,
328    ) -> Result<Arc<dyn ServiceAccount>, Error> {
329        let channel = self
330            .get_channel()
331            .await
332            .context(ErrorData::BindingConfigInvalid {
333                binding_name: binding_name.to_string(),
334                reason: "Failed to get shared gRPC channel".to_string(),
335            })?;
336
337        let service_account = Arc::new(
338            GrpcServiceAccount::new_from_channel(channel, binding_name.to_string())
339                .await
340                .context(ErrorData::BindingConfigInvalid {
341                    binding_name: binding_name.to_string(),
342                    reason: "Failed to initialize gRPC service account".to_string(),
343                })?,
344        );
345
346        Ok(service_account)
347    }
348}