Skip to main content

alien_bindings/grpc/
server.rs

1use crate::{
2    error::{ErrorData, Result},
3    grpc::{
4        artifact_registry_service::{
5            alien_bindings::artifact_registry::FILE_DESCRIPTOR_SET as ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET,
6            ArtifactRegistryGrpcServer,
7        },
8        build_service::{
9            alien_bindings::build::FILE_DESCRIPTOR_SET as BUILD_FILE_DESCRIPTOR_SET,
10            BuildGrpcServer,
11        },
12        container_service::{
13            alien_bindings::container::FILE_DESCRIPTOR_SET as CONTAINER_FILE_DESCRIPTOR_SET,
14            ContainerGrpcServer,
15        },
16        control_service::{
17            alien_bindings::control::FILE_DESCRIPTOR_SET as CONTROL_FILE_DESCRIPTOR_SET,
18            ControlGrpcServer,
19        },
20        kv_service::{
21            alien_bindings::kv::FILE_DESCRIPTOR_SET as KV_FILE_DESCRIPTOR_SET, KvGrpcServer,
22        },
23        queue_service::{
24            alien_bindings::queue::FILE_DESCRIPTOR_SET as QUEUE_FILE_DESCRIPTOR_SET,
25            QueueGrpcServer,
26        },
27        service_account_service::{
28            alien_bindings::service_account::FILE_DESCRIPTOR_SET as SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET,
29            ServiceAccountGrpcServer,
30        },
31        storage_service::{
32            alien_bindings::storage::FILE_DESCRIPTOR_SET as STORAGE_FILE_DESCRIPTOR_SET,
33            StorageGrpcServer,
34        },
35        vault_service::{
36            alien_bindings::vault::FILE_DESCRIPTOR_SET as VAULT_FILE_DESCRIPTOR_SET,
37            VaultGrpcServer,
38        },
39        wait_until_service::{
40            alien_bindings::wait_until::FILE_DESCRIPTOR_SET as WAIT_UNTIL_FILE_DESCRIPTOR_SET,
41            WaitUntilGrpcServer,
42        },
43        worker_service::{
44            alien_bindings::worker::FILE_DESCRIPTOR_SET as WORKER_FILE_DESCRIPTOR_SET,
45            WorkerGrpcServer,
46        },
47        MAX_GRPC_MESSAGE_SIZE,
48    },
49    BindingsProviderApi,
50};
51use alien_error::{Context, IntoAlienError};
52use std::sync::Arc;
53use tokio::sync::oneshot;
54use tokio_stream::wrappers::TcpListenerStream;
55use tonic::transport::Server;
56use tracing::info;
57
58/// Handles returned from run_grpc_server
59pub struct GrpcServerHandles {
60    pub wait_until_server: Arc<WaitUntilGrpcServer>,
61    pub control_server: Arc<ControlGrpcServer>,
62    pub server_task: tokio::task::JoinHandle<Result<()>>,
63    pub readiness_receiver: oneshot::Receiver<()>,
64}
65
66/// Configures and runs the main alien-bindings gRPC server.
67///
68/// This server will host all implemented gRPC services (Storage, KV, Queue, Control, etc.).
69/// Returns handles for drain coordination, control service, and readiness notification.
70pub async fn run_grpc_server(
71    provider: Arc<dyn BindingsProviderApi>,
72    addr_str: &str,
73) -> Result<GrpcServerHandles> {
74    let addr: std::net::SocketAddr =
75        addr_str
76            .parse()
77            .into_alien_error()
78            .context(ErrorData::ServerBindFailed {
79                address: addr_str.to_string(),
80                reason: "Invalid address format".to_string(),
81            })?;
82
83    info!("Configuring gRPC server for {}", addr);
84
85    // The bindings gRPC server is unauthenticated by design — it's intra-machine
86    // IPC between alien-runtime and the application code it manages, and they
87    // share a trust boundary. Binding to a non-loopback address erases that
88    // assumption (think `0.0.0.0` from a misconfigured ALIEN_BINDINGS_ADDRESS,
89    // `--network=host` Docker, or shared-pod sidecar). Make the misconfiguration
90    // loud at startup rather than silently exposing Vault/Storage/KV.
91    if !addr.ip().is_loopback() {
92        tracing::warn!(
93            address = %addr,
94            "alien-runtime gRPC server is binding to a NON-LOOPBACK address. \
95            This exposes the bindings server (Vault, Storage, KV, Control) to anyone who can \
96            reach this network interface. The server has no authentication. Set \
97            ALIEN_BINDINGS_ADDRESS=127.0.0.1:51351 unless you have a specific reason to expose it."
98        );
99    }
100
101    let wait_until_server = Arc::new(WaitUntilGrpcServer::new());
102    let wait_until_server_handle = wait_until_server.clone();
103
104    let control_server = Arc::new(ControlGrpcServer::new());
105    let control_server_handle = control_server.clone();
106
107    let mut router = Server::builder()
108        .concurrency_limit_per_connection(256) // Allow many concurrent requests per connection
109        .add_service(
110            StorageGrpcServer::new(provider.clone())
111                .into_service()
112                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
113        )
114        .add_service(
115            BuildGrpcServer::new(provider.clone())
116                .into_service()
117                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
118        )
119        .add_service(
120            ArtifactRegistryGrpcServer::new(provider.clone())
121                .into_service()
122                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
123        )
124        .add_service(
125            VaultGrpcServer::new(provider.clone())
126                .into_service()
127                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
128        )
129        .add_service(
130            KvGrpcServer::new(provider.clone())
131                .into_service()
132                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
133        )
134        .add_service(
135            QueueGrpcServer::new(provider.clone())
136                .into_service()
137                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
138        )
139        .add_service(
140            WorkerGrpcServer::new(provider.clone())
141                .into_service()
142                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
143        )
144        .add_service(
145            ContainerGrpcServer::new(provider.clone())
146                .into_service()
147                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
148        )
149        .add_service(
150            ServiceAccountGrpcServer::new(provider.clone())
151                .into_service()
152                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
153        )
154        .add_service(
155            (*wait_until_server)
156                .clone()
157                .into_service()
158                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
159        )
160        .add_service(
161            (*control_server)
162                .clone()
163                .into_service()
164                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
165        );
166
167    // Add reflection service
168    let reflection_service = tonic_reflection::server::Builder::configure()
169        .register_encoded_file_descriptor_set(STORAGE_FILE_DESCRIPTOR_SET)
170        .register_encoded_file_descriptor_set(BUILD_FILE_DESCRIPTOR_SET)
171        .register_encoded_file_descriptor_set(ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET)
172        .register_encoded_file_descriptor_set(VAULT_FILE_DESCRIPTOR_SET)
173        .register_encoded_file_descriptor_set(KV_FILE_DESCRIPTOR_SET)
174        .register_encoded_file_descriptor_set(QUEUE_FILE_DESCRIPTOR_SET)
175        .register_encoded_file_descriptor_set(WORKER_FILE_DESCRIPTOR_SET)
176        .register_encoded_file_descriptor_set(CONTAINER_FILE_DESCRIPTOR_SET)
177        .register_encoded_file_descriptor_set(SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET)
178        .register_encoded_file_descriptor_set(WAIT_UNTIL_FILE_DESCRIPTOR_SET)
179        .register_encoded_file_descriptor_set(CONTROL_FILE_DESCRIPTOR_SET)
180        .build_v1()
181        .into_alien_error()
182        .context(ErrorData::GrpcServiceUnavailable {
183            service: "reflection".to_string(),
184            endpoint: addr.to_string(),
185            reason: "Failed to build reflection service".to_string(),
186        })?;
187
188    router = router.add_service(reflection_service);
189
190    // Create a oneshot channel to signal when the server is ready
191    let (readiness_sender, readiness_receiver) = oneshot::channel();
192
193    let server_task = tokio::spawn(async move {
194        // Bind to the address first to ensure it's available
195        let listener = tokio::net::TcpListener::bind(addr)
196            .await
197            .into_alien_error()
198            .context(ErrorData::ServerBindFailed {
199                address: addr.to_string(),
200                reason: "Failed to bind to address".to_string(),
201            })?;
202
203        info!("gRPC server successfully bound to {}", addr);
204
205        // Signal that the server is ready to accept connections
206        if let Err(_) = readiness_sender.send(()) {
207            // The receiver was dropped, but we can still continue
208            info!("Readiness receiver was dropped, continuing with server startup");
209        }
210
211        // Convert the TcpListener to a stream of incoming connections
212        let incoming = TcpListenerStream::new(listener);
213
214        info!("gRPC server is now serving requests on {}", addr);
215
216        // Serve with the incoming connection stream
217        router
218            .serve_with_incoming(incoming)
219            .await
220            .into_alien_error()
221            .context(ErrorData::GrpcServiceUnavailable {
222                service: "main".to_string(),
223                endpoint: addr.to_string(),
224                reason: "gRPC server failed to serve".to_string(),
225            })
226    });
227
228    Ok(GrpcServerHandles {
229        wait_until_server: wait_until_server_handle,
230        control_server: control_server_handle,
231        server_task,
232        readiness_receiver,
233    })
234}