alien_bindings/providers/
grpc_provider.rs1use crate::{
2 error::{Error, ErrorData},
3 providers::{
4 artifact_registry::grpc::GrpcArtifactRegistry, build::grpc::GrpcBuild,
5 function::grpc::GrpcFunction, kv::grpc::GrpcKv, queue::grpc::GrpcQueue,
6 service_account::grpc::GrpcServiceAccount, storage::grpc::GrpcStorage,
7 vault::grpc::GrpcVault,
8 },
9 traits::{
10 ArtifactRegistry, BindingsProviderApi, Build, Container, Function, Kv, Queue,
11 ServiceAccount, Storage, Vault,
12 },
13};
14use alien_error::{AlienError, Context, IntoAlienError};
15use async_trait::async_trait;
16use std::{collections::HashMap, sync::Arc};
17use tonic::transport::Channel;
18
19pub async fn create_grpc_channel(grpc_address: String) -> crate::error::Result<Channel> {
22 tracing::debug!(
23 "create_grpc_channel: Connecting to gRPC server at: {}",
24 grpc_address
25 );
26
27 let endpoint_uri = if grpc_address.contains("://") {
29 grpc_address.clone()
30 } else {
31 format!("http://{}", grpc_address)
32 };
33
34 tracing::debug!("create_grpc_channel: Endpoint URI: {}", endpoint_uri);
35
36 let endpoint = Channel::from_shared(endpoint_uri.clone())
37 .into_alien_error()
38 .context(ErrorData::GrpcConnectionFailed {
39 endpoint: endpoint_uri.clone(),
40 reason: "Invalid gRPC endpoint URI format".to_string(),
41 })?
42 .timeout(std::time::Duration::from_secs(30)) .connect_timeout(std::time::Duration::from_secs(5)) .http2_keep_alive_interval(std::time::Duration::from_secs(30)) .keep_alive_timeout(std::time::Duration::from_secs(10)) .keep_alive_while_idle(true); tracing::debug!("create_grpc_channel: Attempting to connect to endpoint");
49 let channel =
50 endpoint
51 .connect()
52 .await
53 .into_alien_error()
54 .context(ErrorData::GrpcConnectionFailed {
55 endpoint: grpc_address.clone(),
56 reason: "Failed to establish gRPC connection".to_string(),
57 })?;
58
59 tracing::debug!(
60 "create_grpc_channel: Successfully connected to {}",
61 grpc_address
62 );
63 Ok(channel)
64}
65
66#[derive(Debug, Clone)]
73pub struct GrpcBindingsProvider {
74 env: HashMap<String, String>,
75 channel: Arc<tokio::sync::OnceCell<Channel>>,
77}
78
79impl GrpcBindingsProvider {
80 #[cfg(not(target_arch = "wasm32"))]
83 pub fn new() -> Result<Self, Error> {
84 Self::new_with_env(std::env::vars().collect())
85 }
86
87 #[cfg(target_arch = "wasm32")]
90 pub fn new() -> Result<Self, Error> {
91 Self::new_with_env(HashMap::new())
92 }
93
94 pub fn new_with_env(env: HashMap<String, String>) -> Result<Self, Error> {
96 Ok(Self {
97 env,
98 channel: Arc::new(tokio::sync::OnceCell::new()),
99 })
100 }
101
102 fn get_env_var(&self, var_name: &str) -> Result<String, Error> {
104 self.env.get(var_name).cloned().ok_or_else(|| {
105 AlienError::new(ErrorData::EnvironmentVariableMissing {
106 variable_name: var_name.to_string(),
107 })
108 })
109 }
110
111 fn get_grpc_address(&self) -> Result<String, Error> {
113 self.get_env_var("ALIEN_BINDINGS_GRPC_ADDRESS")
114 }
115
116 async fn get_channel(&self) -> Result<Channel, Error> {
120 let channel = self
121 .channel
122 .get_or_try_init(|| async {
123 let grpc_address = self.get_grpc_address()?;
124 tracing::debug!(
125 "GrpcBindingsProvider: Creating shared gRPC channel to {}",
126 grpc_address
127 );
128 create_grpc_channel(grpc_address).await
129 })
130 .await?;
131
132 Ok(channel.clone())
133 }
134
135 pub async fn get_shared_channel(&self) -> Result<Channel, Error> {
138 self.get_channel().await
139 }
140}
141
142#[async_trait]
143impl BindingsProviderApi for GrpcBindingsProvider {
144 async fn load_storage(&self, binding_name: &str) -> Result<Arc<dyn Storage>, Error> {
145 tracing::debug!(
146 "GrpcBindingsProvider::load_storage: Loading storage binding: {}",
147 binding_name
148 );
149
150 let channel = self
151 .get_channel()
152 .await
153 .context(ErrorData::BindingConfigInvalid {
154 binding_name: binding_name.to_string(),
155 reason: "Failed to get shared gRPC channel".to_string(),
156 })?;
157
158 let storage = Arc::new(
159 GrpcStorage::new_from_channel(channel, binding_name.to_string())
160 .await
161 .context(ErrorData::BindingConfigInvalid {
162 binding_name: binding_name.to_string(),
163 reason: "Failed to initialize gRPC storage".to_string(),
164 })?,
165 );
166
167 tracing::debug!(
168 "GrpcBindingsProvider::load_storage: Successfully loaded storage binding: {}",
169 binding_name
170 );
171 Ok(storage)
172 }
173
174 async fn load_build(&self, binding_name: &str) -> Result<Arc<dyn Build>, Error> {
175 let channel = self
176 .get_channel()
177 .await
178 .context(ErrorData::BindingConfigInvalid {
179 binding_name: binding_name.to_string(),
180 reason: "Failed to get shared gRPC channel".to_string(),
181 })?;
182
183 let build = Arc::new(
184 GrpcBuild::new_from_channel(channel, binding_name.to_string())
185 .await
186 .context(ErrorData::BindingConfigInvalid {
187 binding_name: binding_name.to_string(),
188 reason: "Failed to initialize gRPC build".to_string(),
189 })?,
190 );
191
192 Ok(build)
193 }
194
195 async fn load_artifact_registry(
196 &self,
197 binding_name: &str,
198 ) -> Result<Arc<dyn ArtifactRegistry>, Error> {
199 let channel = self
200 .get_channel()
201 .await
202 .context(ErrorData::BindingConfigInvalid {
203 binding_name: binding_name.to_string(),
204 reason: "Failed to get shared gRPC channel".to_string(),
205 })?;
206
207 let artifact_registry = Arc::new(
208 GrpcArtifactRegistry::new_from_channel(channel, binding_name.to_string())
209 .await
210 .context(ErrorData::BindingConfigInvalid {
211 binding_name: binding_name.to_string(),
212 reason: "Failed to initialize gRPC artifact registry".to_string(),
213 })?,
214 );
215
216 Ok(artifact_registry)
217 }
218
219 async fn load_vault(&self, binding_name: &str) -> Result<Arc<dyn Vault>, Error> {
220 let channel = self
221 .get_channel()
222 .await
223 .context(ErrorData::BindingConfigInvalid {
224 binding_name: binding_name.to_string(),
225 reason: "Failed to get shared gRPC channel".to_string(),
226 })?;
227
228 let vault = Arc::new(
229 GrpcVault::new_from_channel(channel, binding_name.to_string())
230 .await
231 .context(ErrorData::BindingConfigInvalid {
232 binding_name: binding_name.to_string(),
233 reason: "Failed to initialize gRPC vault".to_string(),
234 })?,
235 );
236
237 Ok(vault)
238 }
239
240 async fn load_kv(&self, binding_name: &str) -> Result<Arc<dyn Kv>, Error> {
241 let channel = self
242 .get_channel()
243 .await
244 .context(ErrorData::BindingConfigInvalid {
245 binding_name: binding_name.to_string(),
246 reason: "Failed to get shared gRPC channel".to_string(),
247 })?;
248
249 let kv = Arc::new(
250 GrpcKv::new_from_channel(channel, binding_name.to_string())
251 .await
252 .context(ErrorData::BindingConfigInvalid {
253 binding_name: binding_name.to_string(),
254 reason: "Failed to initialize gRPC KV".to_string(),
255 })?,
256 );
257
258 Ok(kv)
259 }
260
261 async fn load_queue(&self, binding_name: &str) -> Result<Arc<dyn Queue>, Error> {
262 let channel = self
263 .get_channel()
264 .await
265 .context(ErrorData::BindingConfigInvalid {
266 binding_name: binding_name.to_string(),
267 reason: "Failed to get shared gRPC channel".to_string(),
268 })?;
269
270 let queue = Arc::new(
271 GrpcQueue::new_from_channel(channel, binding_name.to_string())
272 .await
273 .context(ErrorData::BindingConfigInvalid {
274 binding_name: binding_name.to_string(),
275 reason: "Failed to initialize gRPC Queue".to_string(),
276 })?,
277 );
278
279 Ok(queue)
280 }
281
282 async fn load_function(&self, binding_name: &str) -> Result<Arc<dyn Function>, Error> {
283 let channel = self
284 .get_channel()
285 .await
286 .context(ErrorData::BindingConfigInvalid {
287 binding_name: binding_name.to_string(),
288 reason: "Failed to get shared gRPC channel".to_string(),
289 })?;
290
291 let function = Arc::new(
292 GrpcFunction::new_from_channel(channel, binding_name.to_string())
293 .await
294 .context(ErrorData::BindingConfigInvalid {
295 binding_name: binding_name.to_string(),
296 reason: "Failed to initialize gRPC Function".to_string(),
297 })?,
298 );
299
300 Ok(function)
301 }
302
303 async fn load_container(&self, binding_name: &str) -> Result<Arc<dyn Container>, Error> {
304 use crate::providers::container::GrpcContainer;
305
306 let channel = self
307 .get_channel()
308 .await
309 .context(ErrorData::BindingConfigInvalid {
310 binding_name: binding_name.to_string(),
311 reason: "Failed to get shared gRPC channel".to_string(),
312 })?;
313
314 let container = Arc::new(
315 GrpcContainer::new_from_channel(channel, binding_name.to_string())
316 .await
317 .context(ErrorData::BindingConfigInvalid {
318 binding_name: binding_name.to_string(),
319 reason: "Failed to initialize gRPC Container".to_string(),
320 })?,
321 );
322
323 Ok(container)
324 }
325
326 async fn load_service_account(
327 &self,
328 binding_name: &str,
329 ) -> Result<Arc<dyn ServiceAccount>, Error> {
330 let channel = self
331 .get_channel()
332 .await
333 .context(ErrorData::BindingConfigInvalid {
334 binding_name: binding_name.to_string(),
335 reason: "Failed to get shared gRPC channel".to_string(),
336 })?;
337
338 let service_account = Arc::new(
339 GrpcServiceAccount::new_from_channel(channel, binding_name.to_string())
340 .await
341 .context(ErrorData::BindingConfigInvalid {
342 binding_name: binding_name.to_string(),
343 reason: "Failed to initialize gRPC service account".to_string(),
344 })?,
345 );
346
347 Ok(service_account)
348 }
349}