alien_bindings/grpc/
server.rs1use crate::{
2 error::{ErrorData, Result},
3 grpc::{
4 artifact_registry_service::{
5 alien_bindings::artifact_registry::FILE_DESCRIPTOR_SET as ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET,
6 ArtifactRegistryGrpcServer,
7 },
8 build_service::{
9 alien_bindings::build::FILE_DESCRIPTOR_SET as BUILD_FILE_DESCRIPTOR_SET,
10 BuildGrpcServer,
11 },
12 container_service::{
13 alien_bindings::container::FILE_DESCRIPTOR_SET as CONTAINER_FILE_DESCRIPTOR_SET,
14 ContainerGrpcServer,
15 },
16 control_service::{
17 alien_bindings::control::FILE_DESCRIPTOR_SET as CONTROL_FILE_DESCRIPTOR_SET,
18 ControlGrpcServer,
19 },
20 kv_service::{
21 alien_bindings::kv::FILE_DESCRIPTOR_SET as KV_FILE_DESCRIPTOR_SET, KvGrpcServer,
22 },
23 queue_service::{
24 alien_bindings::queue::FILE_DESCRIPTOR_SET as QUEUE_FILE_DESCRIPTOR_SET,
25 QueueGrpcServer,
26 },
27 service_account_service::{
28 alien_bindings::service_account::FILE_DESCRIPTOR_SET as SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET,
29 ServiceAccountGrpcServer,
30 },
31 storage_service::{
32 alien_bindings::storage::FILE_DESCRIPTOR_SET as STORAGE_FILE_DESCRIPTOR_SET,
33 StorageGrpcServer,
34 },
35 vault_service::{
36 alien_bindings::vault::FILE_DESCRIPTOR_SET as VAULT_FILE_DESCRIPTOR_SET,
37 VaultGrpcServer,
38 },
39 wait_until_service::{
40 alien_bindings::wait_until::FILE_DESCRIPTOR_SET as WAIT_UNTIL_FILE_DESCRIPTOR_SET,
41 WaitUntilGrpcServer,
42 },
43 worker_service::{
44 alien_bindings::worker::FILE_DESCRIPTOR_SET as WORKER_FILE_DESCRIPTOR_SET,
45 WorkerGrpcServer,
46 },
47 MAX_GRPC_MESSAGE_SIZE,
48 },
49 BindingsProviderApi,
50};
51use alien_error::{Context, IntoAlienError};
52use std::sync::Arc;
53use tokio::sync::oneshot;
54use tokio_stream::wrappers::TcpListenerStream;
55use tonic::transport::Server;
56use tracing::info;
57
58pub struct GrpcServerHandles {
60 pub wait_until_server: Arc<WaitUntilGrpcServer>,
61 pub control_server: Arc<ControlGrpcServer>,
62 pub server_task: tokio::task::JoinHandle<Result<()>>,
63 pub readiness_receiver: oneshot::Receiver<()>,
64}
65
66pub async fn run_grpc_server(
71 provider: Arc<dyn BindingsProviderApi>,
72 addr_str: &str,
73) -> Result<GrpcServerHandles> {
74 let addr: std::net::SocketAddr =
75 addr_str
76 .parse()
77 .into_alien_error()
78 .context(ErrorData::ServerBindFailed {
79 address: addr_str.to_string(),
80 reason: "Invalid address format".to_string(),
81 })?;
82
83 info!("Configuring gRPC server for {}", addr);
84
85 if !addr.ip().is_loopback() {
92 tracing::warn!(
93 address = %addr,
94 "alien-runtime gRPC server is binding to a NON-LOOPBACK address. \
95 This exposes the bindings server (Vault, Storage, KV, Control) to anyone who can \
96 reach this network interface. The server has no authentication. Set \
97 ALIEN_BINDINGS_ADDRESS=127.0.0.1:51351 unless you have a specific reason to expose it."
98 );
99 }
100
101 let wait_until_server = Arc::new(WaitUntilGrpcServer::new());
102 let wait_until_server_handle = wait_until_server.clone();
103
104 let control_server = Arc::new(ControlGrpcServer::new());
105 let control_server_handle = control_server.clone();
106
107 let mut router = Server::builder()
108 .concurrency_limit_per_connection(256) .add_service(
110 StorageGrpcServer::new(provider.clone())
111 .into_service()
112 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
113 )
114 .add_service(
115 BuildGrpcServer::new(provider.clone())
116 .into_service()
117 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
118 )
119 .add_service(
120 ArtifactRegistryGrpcServer::new(provider.clone())
121 .into_service()
122 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
123 )
124 .add_service(
125 VaultGrpcServer::new(provider.clone())
126 .into_service()
127 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
128 )
129 .add_service(
130 KvGrpcServer::new(provider.clone())
131 .into_service()
132 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
133 )
134 .add_service(
135 QueueGrpcServer::new(provider.clone())
136 .into_service()
137 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
138 )
139 .add_service(
140 WorkerGrpcServer::new(provider.clone())
141 .into_service()
142 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
143 )
144 .add_service(
145 ContainerGrpcServer::new(provider.clone())
146 .into_service()
147 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
148 )
149 .add_service(
150 ServiceAccountGrpcServer::new(provider.clone())
151 .into_service()
152 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
153 )
154 .add_service(
155 (*wait_until_server)
156 .clone()
157 .into_service()
158 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
159 )
160 .add_service(
161 (*control_server)
162 .clone()
163 .into_service()
164 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
165 );
166
167 let reflection_service = tonic_reflection::server::Builder::configure()
169 .register_encoded_file_descriptor_set(STORAGE_FILE_DESCRIPTOR_SET)
170 .register_encoded_file_descriptor_set(BUILD_FILE_DESCRIPTOR_SET)
171 .register_encoded_file_descriptor_set(ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET)
172 .register_encoded_file_descriptor_set(VAULT_FILE_DESCRIPTOR_SET)
173 .register_encoded_file_descriptor_set(KV_FILE_DESCRIPTOR_SET)
174 .register_encoded_file_descriptor_set(QUEUE_FILE_DESCRIPTOR_SET)
175 .register_encoded_file_descriptor_set(WORKER_FILE_DESCRIPTOR_SET)
176 .register_encoded_file_descriptor_set(CONTAINER_FILE_DESCRIPTOR_SET)
177 .register_encoded_file_descriptor_set(SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET)
178 .register_encoded_file_descriptor_set(WAIT_UNTIL_FILE_DESCRIPTOR_SET)
179 .register_encoded_file_descriptor_set(CONTROL_FILE_DESCRIPTOR_SET)
180 .build_v1()
181 .into_alien_error()
182 .context(ErrorData::GrpcServiceUnavailable {
183 service: "reflection".to_string(),
184 endpoint: addr.to_string(),
185 reason: "Failed to build reflection service".to_string(),
186 })?;
187
188 router = router.add_service(reflection_service);
189
190 let (readiness_sender, readiness_receiver) = oneshot::channel();
192
193 let server_task = tokio::spawn(async move {
194 let listener = tokio::net::TcpListener::bind(addr)
196 .await
197 .into_alien_error()
198 .context(ErrorData::ServerBindFailed {
199 address: addr.to_string(),
200 reason: "Failed to bind to address".to_string(),
201 })?;
202
203 info!("gRPC server successfully bound to {}", addr);
204
205 if let Err(_) = readiness_sender.send(()) {
207 info!("Readiness receiver was dropped, continuing with server startup");
209 }
210
211 let incoming = TcpListenerStream::new(listener);
213
214 info!("gRPC server is now serving requests on {}", addr);
215
216 router
218 .serve_with_incoming(incoming)
219 .await
220 .into_alien_error()
221 .context(ErrorData::GrpcServiceUnavailable {
222 service: "main".to_string(),
223 endpoint: addr.to_string(),
224 reason: "gRPC server failed to serve".to_string(),
225 })
226 });
227
228 Ok(GrpcServerHandles {
229 wait_until_server: wait_until_server_handle,
230 control_server: control_server_handle,
231 server_task,
232 readiness_receiver,
233 })
234}