alien_bindings/providers/
grpc_provider.rs1use crate::{
2 error::{Error, ErrorData},
3 providers::{
4 artifact_registry::grpc::GrpcArtifactRegistry, build::grpc::GrpcBuild, kv::grpc::GrpcKv,
5 queue::grpc::GrpcQueue, service_account::grpc::GrpcServiceAccount,
6 storage::grpc::GrpcStorage, vault::grpc::GrpcVault, worker::grpc::GrpcWorker,
7 },
8 traits::{
9 ArtifactRegistry, BindingsProviderApi, Build, Container, Kv, Queue, ServiceAccount,
10 Storage, Vault, Worker,
11 },
12};
13use alien_error::{AlienError, Context, IntoAlienError};
14use async_trait::async_trait;
15use std::{collections::HashMap, sync::Arc};
16use tonic::transport::Channel;
17
18pub async fn create_grpc_channel(grpc_address: String) -> crate::error::Result<Channel> {
21 tracing::debug!(
22 "create_grpc_channel: Connecting to gRPC server at: {}",
23 grpc_address
24 );
25
26 let endpoint_uri = if grpc_address.contains("://") {
28 grpc_address.clone()
29 } else {
30 format!("http://{}", grpc_address)
31 };
32
33 tracing::debug!("create_grpc_channel: Endpoint URI: {}", endpoint_uri);
34
35 let endpoint = Channel::from_shared(endpoint_uri.clone())
36 .into_alien_error()
37 .context(ErrorData::GrpcConnectionFailed {
38 endpoint: endpoint_uri.clone(),
39 reason: "Invalid gRPC endpoint URI format".to_string(),
40 })?
41 .timeout(std::time::Duration::from_secs(30)) .connect_timeout(std::time::Duration::from_secs(5)) .http2_keep_alive_interval(std::time::Duration::from_secs(30)) .keep_alive_timeout(std::time::Duration::from_secs(10)) .keep_alive_while_idle(true); tracing::debug!("create_grpc_channel: Attempting to connect to endpoint");
48 let channel =
49 endpoint
50 .connect()
51 .await
52 .into_alien_error()
53 .context(ErrorData::GrpcConnectionFailed {
54 endpoint: grpc_address.clone(),
55 reason: "Failed to establish gRPC connection".to_string(),
56 })?;
57
58 tracing::debug!(
59 "create_grpc_channel: Successfully connected to {}",
60 grpc_address
61 );
62 Ok(channel)
63}
64
65#[derive(Debug, Clone)]
72pub struct GrpcBindingsProvider {
73 env: HashMap<String, String>,
74 channel: Arc<tokio::sync::OnceCell<Channel>>,
76}
77
78impl GrpcBindingsProvider {
79 #[cfg(not(target_arch = "wasm32"))]
82 pub fn new() -> Result<Self, Error> {
83 Self::new_with_env(std::env::vars().collect())
84 }
85
86 #[cfg(target_arch = "wasm32")]
89 pub fn new() -> Result<Self, Error> {
90 Self::new_with_env(HashMap::new())
91 }
92
93 pub fn new_with_env(env: HashMap<String, String>) -> Result<Self, Error> {
95 Ok(Self {
96 env,
97 channel: Arc::new(tokio::sync::OnceCell::new()),
98 })
99 }
100
101 fn get_env_var(&self, var_name: &str) -> Result<String, Error> {
103 self.env.get(var_name).cloned().ok_or_else(|| {
104 AlienError::new(ErrorData::EnvironmentVariableMissing {
105 variable_name: var_name.to_string(),
106 })
107 })
108 }
109
110 fn get_grpc_address(&self) -> Result<String, Error> {
112 self.get_env_var("ALIEN_BINDINGS_GRPC_ADDRESS")
113 }
114
115 async fn get_channel(&self) -> Result<Channel, Error> {
119 let channel = self
120 .channel
121 .get_or_try_init(|| async {
122 let grpc_address = self.get_grpc_address()?;
123 tracing::debug!(
124 "GrpcBindingsProvider: Creating shared gRPC channel to {}",
125 grpc_address
126 );
127 create_grpc_channel(grpc_address).await
128 })
129 .await?;
130
131 Ok(channel.clone())
132 }
133
134 pub async fn get_shared_channel(&self) -> Result<Channel, Error> {
137 self.get_channel().await
138 }
139}
140
141#[async_trait]
142impl BindingsProviderApi for GrpcBindingsProvider {
143 async fn load_storage(&self, binding_name: &str) -> Result<Arc<dyn Storage>, Error> {
144 tracing::debug!(
145 "GrpcBindingsProvider::load_storage: Loading storage binding: {}",
146 binding_name
147 );
148
149 let channel = self
150 .get_channel()
151 .await
152 .context(ErrorData::BindingConfigInvalid {
153 binding_name: binding_name.to_string(),
154 reason: "Failed to get shared gRPC channel".to_string(),
155 })?;
156
157 let storage = Arc::new(
158 GrpcStorage::new_from_channel(channel, binding_name.to_string())
159 .await
160 .context(ErrorData::BindingConfigInvalid {
161 binding_name: binding_name.to_string(),
162 reason: "Failed to initialize gRPC storage".to_string(),
163 })?,
164 );
165
166 tracing::debug!(
167 "GrpcBindingsProvider::load_storage: Successfully loaded storage binding: {}",
168 binding_name
169 );
170 Ok(storage)
171 }
172
173 async fn load_build(&self, binding_name: &str) -> Result<Arc<dyn Build>, Error> {
174 let channel = self
175 .get_channel()
176 .await
177 .context(ErrorData::BindingConfigInvalid {
178 binding_name: binding_name.to_string(),
179 reason: "Failed to get shared gRPC channel".to_string(),
180 })?;
181
182 let build = Arc::new(
183 GrpcBuild::new_from_channel(channel, binding_name.to_string())
184 .await
185 .context(ErrorData::BindingConfigInvalid {
186 binding_name: binding_name.to_string(),
187 reason: "Failed to initialize gRPC build".to_string(),
188 })?,
189 );
190
191 Ok(build)
192 }
193
194 async fn load_artifact_registry(
195 &self,
196 binding_name: &str,
197 ) -> Result<Arc<dyn ArtifactRegistry>, Error> {
198 let channel = self
199 .get_channel()
200 .await
201 .context(ErrorData::BindingConfigInvalid {
202 binding_name: binding_name.to_string(),
203 reason: "Failed to get shared gRPC channel".to_string(),
204 })?;
205
206 let artifact_registry = Arc::new(
207 GrpcArtifactRegistry::new_from_channel(channel, binding_name.to_string())
208 .await
209 .context(ErrorData::BindingConfigInvalid {
210 binding_name: binding_name.to_string(),
211 reason: "Failed to initialize gRPC artifact registry".to_string(),
212 })?,
213 );
214
215 Ok(artifact_registry)
216 }
217
218 async fn load_vault(&self, binding_name: &str) -> Result<Arc<dyn Vault>, Error> {
219 let channel = self
220 .get_channel()
221 .await
222 .context(ErrorData::BindingConfigInvalid {
223 binding_name: binding_name.to_string(),
224 reason: "Failed to get shared gRPC channel".to_string(),
225 })?;
226
227 let vault = Arc::new(
228 GrpcVault::new_from_channel(channel, binding_name.to_string())
229 .await
230 .context(ErrorData::BindingConfigInvalid {
231 binding_name: binding_name.to_string(),
232 reason: "Failed to initialize gRPC vault".to_string(),
233 })?,
234 );
235
236 Ok(vault)
237 }
238
239 async fn load_kv(&self, binding_name: &str) -> Result<Arc<dyn Kv>, Error> {
240 let channel = self
241 .get_channel()
242 .await
243 .context(ErrorData::BindingConfigInvalid {
244 binding_name: binding_name.to_string(),
245 reason: "Failed to get shared gRPC channel".to_string(),
246 })?;
247
248 let kv = Arc::new(
249 GrpcKv::new_from_channel(channel, binding_name.to_string())
250 .await
251 .context(ErrorData::BindingConfigInvalid {
252 binding_name: binding_name.to_string(),
253 reason: "Failed to initialize gRPC KV".to_string(),
254 })?,
255 );
256
257 Ok(kv)
258 }
259
260 async fn load_queue(&self, binding_name: &str) -> Result<Arc<dyn Queue>, Error> {
261 let channel = self
262 .get_channel()
263 .await
264 .context(ErrorData::BindingConfigInvalid {
265 binding_name: binding_name.to_string(),
266 reason: "Failed to get shared gRPC channel".to_string(),
267 })?;
268
269 let queue = Arc::new(
270 GrpcQueue::new_from_channel(channel, binding_name.to_string())
271 .await
272 .context(ErrorData::BindingConfigInvalid {
273 binding_name: binding_name.to_string(),
274 reason: "Failed to initialize gRPC Queue".to_string(),
275 })?,
276 );
277
278 Ok(queue)
279 }
280
281 async fn load_worker(&self, binding_name: &str) -> Result<Arc<dyn Worker>, Error> {
282 let channel = self
283 .get_channel()
284 .await
285 .context(ErrorData::BindingConfigInvalid {
286 binding_name: binding_name.to_string(),
287 reason: "Failed to get shared gRPC channel".to_string(),
288 })?;
289
290 let function = Arc::new(
291 GrpcWorker::new_from_channel(channel, binding_name.to_string())
292 .await
293 .context(ErrorData::BindingConfigInvalid {
294 binding_name: binding_name.to_string(),
295 reason: "Failed to initialize gRPC Function".to_string(),
296 })?,
297 );
298
299 Ok(function)
300 }
301
302 async fn load_container(&self, binding_name: &str) -> Result<Arc<dyn Container>, Error> {
303 use crate::providers::container::GrpcContainer;
304
305 let channel = self
306 .get_channel()
307 .await
308 .context(ErrorData::BindingConfigInvalid {
309 binding_name: binding_name.to_string(),
310 reason: "Failed to get shared gRPC channel".to_string(),
311 })?;
312
313 let container = Arc::new(
314 GrpcContainer::new_from_channel(channel, binding_name.to_string())
315 .await
316 .context(ErrorData::BindingConfigInvalid {
317 binding_name: binding_name.to_string(),
318 reason: "Failed to initialize gRPC Container".to_string(),
319 })?,
320 );
321
322 Ok(container)
323 }
324
325 async fn load_service_account(
326 &self,
327 binding_name: &str,
328 ) -> Result<Arc<dyn ServiceAccount>, Error> {
329 let channel = self
330 .get_channel()
331 .await
332 .context(ErrorData::BindingConfigInvalid {
333 binding_name: binding_name.to_string(),
334 reason: "Failed to get shared gRPC channel".to_string(),
335 })?;
336
337 let service_account = Arc::new(
338 GrpcServiceAccount::new_from_channel(channel, binding_name.to_string())
339 .await
340 .context(ErrorData::BindingConfigInvalid {
341 binding_name: binding_name.to_string(),
342 reason: "Failed to initialize gRPC service account".to_string(),
343 })?,
344 );
345
346 Ok(service_account)
347 }
348}