alien_bindings/grpc/
server.rs1use crate::{
2 error::{ErrorData, Result},
3 grpc::{
4 artifact_registry_service::{
5 alien_bindings::artifact_registry::FILE_DESCRIPTOR_SET as ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET,
6 ArtifactRegistryGrpcServer,
7 },
8 build_service::{
9 alien_bindings::build::FILE_DESCRIPTOR_SET as BUILD_FILE_DESCRIPTOR_SET,
10 BuildGrpcServer,
11 },
12 container_service::{
13 alien_bindings::container::FILE_DESCRIPTOR_SET as CONTAINER_FILE_DESCRIPTOR_SET,
14 ContainerGrpcServer,
15 },
16 control_service::{
17 alien_bindings::control::FILE_DESCRIPTOR_SET as CONTROL_FILE_DESCRIPTOR_SET,
18 ControlGrpcServer,
19 },
20 function_service::{
21 alien_bindings::function::FILE_DESCRIPTOR_SET as FUNCTION_FILE_DESCRIPTOR_SET,
22 FunctionGrpcServer,
23 },
24 kv_service::{
25 alien_bindings::kv::FILE_DESCRIPTOR_SET as KV_FILE_DESCRIPTOR_SET, KvGrpcServer,
26 },
27 queue_service::{
28 alien_bindings::queue::FILE_DESCRIPTOR_SET as QUEUE_FILE_DESCRIPTOR_SET,
29 QueueGrpcServer,
30 },
31 service_account_service::{
32 alien_bindings::service_account::FILE_DESCRIPTOR_SET as SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET,
33 ServiceAccountGrpcServer,
34 },
35 storage_service::{
36 alien_bindings::storage::FILE_DESCRIPTOR_SET as STORAGE_FILE_DESCRIPTOR_SET,
37 StorageGrpcServer,
38 },
39 vault_service::{
40 alien_bindings::vault::FILE_DESCRIPTOR_SET as VAULT_FILE_DESCRIPTOR_SET,
41 VaultGrpcServer,
42 },
43 wait_until_service::{
44 alien_bindings::wait_until::FILE_DESCRIPTOR_SET as WAIT_UNTIL_FILE_DESCRIPTOR_SET,
45 WaitUntilGrpcServer,
46 },
47 MAX_GRPC_MESSAGE_SIZE,
48 },
49 BindingsProviderApi,
50};
51use alien_error::{Context, IntoAlienError};
52use std::sync::Arc;
53use tokio::sync::oneshot;
54use tokio_stream::wrappers::TcpListenerStream;
55use tonic::transport::Server;
56use tracing::info;
57
58pub struct GrpcServerHandles {
60 pub wait_until_server: Arc<WaitUntilGrpcServer>,
61 pub control_server: Arc<ControlGrpcServer>,
62 pub server_task: tokio::task::JoinHandle<Result<()>>,
63 pub readiness_receiver: oneshot::Receiver<()>,
64}
65
66pub async fn run_grpc_server(
71 provider: Arc<dyn BindingsProviderApi>,
72 addr_str: &str,
73) -> Result<GrpcServerHandles> {
74 let addr: std::net::SocketAddr =
75 addr_str
76 .parse()
77 .into_alien_error()
78 .context(ErrorData::ServerBindFailed {
79 address: addr_str.to_string(),
80 reason: "Invalid address format".to_string(),
81 })?;
82
83 info!("Configuring gRPC server for {}", addr);
84
85 let wait_until_server = Arc::new(WaitUntilGrpcServer::new(provider.clone()));
86 let wait_until_server_handle = wait_until_server.clone();
87
88 let control_server = Arc::new(ControlGrpcServer::new());
89 let control_server_handle = control_server.clone();
90
91 let mut router = Server::builder()
92 .concurrency_limit_per_connection(256) .add_service(
94 StorageGrpcServer::new(provider.clone())
95 .into_service()
96 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
97 )
98 .add_service(
99 BuildGrpcServer::new(provider.clone())
100 .into_service()
101 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
102 )
103 .add_service(
104 ArtifactRegistryGrpcServer::new(provider.clone())
105 .into_service()
106 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
107 )
108 .add_service(
109 VaultGrpcServer::new(provider.clone())
110 .into_service()
111 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
112 )
113 .add_service(
114 KvGrpcServer::new(provider.clone())
115 .into_service()
116 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
117 )
118 .add_service(
119 QueueGrpcServer::new(provider.clone())
120 .into_service()
121 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
122 )
123 .add_service(
124 FunctionGrpcServer::new(provider.clone())
125 .into_service()
126 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
127 )
128 .add_service(
129 ContainerGrpcServer::new(provider.clone())
130 .into_service()
131 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
132 )
133 .add_service(
134 ServiceAccountGrpcServer::new(provider.clone())
135 .into_service()
136 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
137 )
138 .add_service(
139 (*wait_until_server)
140 .clone()
141 .into_service()
142 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
143 )
144 .add_service(
145 (*control_server)
146 .clone()
147 .into_service()
148 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
149 );
150
151 let reflection_service = tonic_reflection::server::Builder::configure()
153 .register_encoded_file_descriptor_set(STORAGE_FILE_DESCRIPTOR_SET)
154 .register_encoded_file_descriptor_set(BUILD_FILE_DESCRIPTOR_SET)
155 .register_encoded_file_descriptor_set(ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET)
156 .register_encoded_file_descriptor_set(VAULT_FILE_DESCRIPTOR_SET)
157 .register_encoded_file_descriptor_set(KV_FILE_DESCRIPTOR_SET)
158 .register_encoded_file_descriptor_set(QUEUE_FILE_DESCRIPTOR_SET)
159 .register_encoded_file_descriptor_set(FUNCTION_FILE_DESCRIPTOR_SET)
160 .register_encoded_file_descriptor_set(CONTAINER_FILE_DESCRIPTOR_SET)
161 .register_encoded_file_descriptor_set(SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET)
162 .register_encoded_file_descriptor_set(WAIT_UNTIL_FILE_DESCRIPTOR_SET)
163 .register_encoded_file_descriptor_set(CONTROL_FILE_DESCRIPTOR_SET)
164 .build_v1()
165 .into_alien_error()
166 .context(ErrorData::GrpcServiceUnavailable {
167 service: "reflection".to_string(),
168 endpoint: addr.to_string(),
169 reason: "Failed to build reflection service".to_string(),
170 })?;
171
172 router = router.add_service(reflection_service);
173
174 let (readiness_sender, readiness_receiver) = oneshot::channel();
176
177 let server_task = tokio::spawn(async move {
178 let listener = tokio::net::TcpListener::bind(addr)
180 .await
181 .into_alien_error()
182 .context(ErrorData::ServerBindFailed {
183 address: addr.to_string(),
184 reason: "Failed to bind to address".to_string(),
185 })?;
186
187 info!("gRPC server successfully bound to {}", addr);
188
189 if let Err(_) = readiness_sender.send(()) {
191 info!("Readiness receiver was dropped, continuing with server startup");
193 }
194
195 let incoming = TcpListenerStream::new(listener);
197
198 info!("gRPC server is now serving requests on {}", addr);
199
200 router
202 .serve_with_incoming(incoming)
203 .await
204 .into_alien_error()
205 .context(ErrorData::GrpcServiceUnavailable {
206 service: "main".to_string(),
207 endpoint: addr.to_string(),
208 reason: "gRPC server failed to serve".to_string(),
209 })
210 });
211
212 Ok(GrpcServerHandles {
213 wait_until_server: wait_until_server_handle,
214 control_server: control_server_handle,
215 server_task,
216 readiness_receiver,
217 })
218}