Skip to main content

alien_bindings/grpc/
server.rs

1use crate::{
2    error::{ErrorData, Result},
3    grpc::{
4        artifact_registry_service::{
5            alien_bindings::artifact_registry::FILE_DESCRIPTOR_SET as ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET,
6            ArtifactRegistryGrpcServer,
7        },
8        build_service::{
9            alien_bindings::build::FILE_DESCRIPTOR_SET as BUILD_FILE_DESCRIPTOR_SET,
10            BuildGrpcServer,
11        },
12        container_service::{
13            alien_bindings::container::FILE_DESCRIPTOR_SET as CONTAINER_FILE_DESCRIPTOR_SET,
14            ContainerGrpcServer,
15        },
16        control_service::{
17            alien_bindings::control::FILE_DESCRIPTOR_SET as CONTROL_FILE_DESCRIPTOR_SET,
18            ControlGrpcServer,
19        },
20        function_service::{
21            alien_bindings::function::FILE_DESCRIPTOR_SET as FUNCTION_FILE_DESCRIPTOR_SET,
22            FunctionGrpcServer,
23        },
24        kv_service::{
25            alien_bindings::kv::FILE_DESCRIPTOR_SET as KV_FILE_DESCRIPTOR_SET, KvGrpcServer,
26        },
27        queue_service::{
28            alien_bindings::queue::FILE_DESCRIPTOR_SET as QUEUE_FILE_DESCRIPTOR_SET,
29            QueueGrpcServer,
30        },
31        service_account_service::{
32            alien_bindings::service_account::FILE_DESCRIPTOR_SET as SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET,
33            ServiceAccountGrpcServer,
34        },
35        storage_service::{
36            alien_bindings::storage::FILE_DESCRIPTOR_SET as STORAGE_FILE_DESCRIPTOR_SET,
37            StorageGrpcServer,
38        },
39        vault_service::{
40            alien_bindings::vault::FILE_DESCRIPTOR_SET as VAULT_FILE_DESCRIPTOR_SET,
41            VaultGrpcServer,
42        },
43        wait_until_service::{
44            alien_bindings::wait_until::FILE_DESCRIPTOR_SET as WAIT_UNTIL_FILE_DESCRIPTOR_SET,
45            WaitUntilGrpcServer,
46        },
47        MAX_GRPC_MESSAGE_SIZE,
48    },
49    BindingsProviderApi,
50};
51use alien_error::{Context, IntoAlienError};
52use std::sync::Arc;
53use tokio::sync::oneshot;
54use tokio_stream::wrappers::TcpListenerStream;
55use tonic::transport::Server;
56use tracing::info;
57
58/// Handles returned from run_grpc_server
59pub struct GrpcServerHandles {
60    pub wait_until_server: Arc<WaitUntilGrpcServer>,
61    pub control_server: Arc<ControlGrpcServer>,
62    pub server_task: tokio::task::JoinHandle<Result<()>>,
63    pub readiness_receiver: oneshot::Receiver<()>,
64}
65
66/// Configures and runs the main alien-bindings gRPC server.
67///
68/// This server will host all implemented gRPC services (Storage, KV, Queue, Control, etc.).
69/// Returns handles for drain coordination, control service, and readiness notification.
70pub async fn run_grpc_server(
71    provider: Arc<dyn BindingsProviderApi>,
72    addr_str: &str,
73) -> Result<GrpcServerHandles> {
74    let addr: std::net::SocketAddr =
75        addr_str
76            .parse()
77            .into_alien_error()
78            .context(ErrorData::ServerBindFailed {
79                address: addr_str.to_string(),
80                reason: "Invalid address format".to_string(),
81            })?;
82
83    info!("Configuring gRPC server for {}", addr);
84
85    let wait_until_server = Arc::new(WaitUntilGrpcServer::new(provider.clone()));
86    let wait_until_server_handle = wait_until_server.clone();
87
88    let control_server = Arc::new(ControlGrpcServer::new());
89    let control_server_handle = control_server.clone();
90
91    let mut router = Server::builder()
92        .concurrency_limit_per_connection(256) // Allow many concurrent requests per connection
93        .add_service(
94            StorageGrpcServer::new(provider.clone())
95                .into_service()
96                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
97        )
98        .add_service(
99            BuildGrpcServer::new(provider.clone())
100                .into_service()
101                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
102        )
103        .add_service(
104            ArtifactRegistryGrpcServer::new(provider.clone())
105                .into_service()
106                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
107        )
108        .add_service(
109            VaultGrpcServer::new(provider.clone())
110                .into_service()
111                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
112        )
113        .add_service(
114            KvGrpcServer::new(provider.clone())
115                .into_service()
116                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
117        )
118        .add_service(
119            QueueGrpcServer::new(provider.clone())
120                .into_service()
121                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
122        )
123        .add_service(
124            FunctionGrpcServer::new(provider.clone())
125                .into_service()
126                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
127        )
128        .add_service(
129            ContainerGrpcServer::new(provider.clone())
130                .into_service()
131                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
132        )
133        .add_service(
134            ServiceAccountGrpcServer::new(provider.clone())
135                .into_service()
136                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
137        )
138        .add_service(
139            (*wait_until_server)
140                .clone()
141                .into_service()
142                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
143        )
144        .add_service(
145            (*control_server)
146                .clone()
147                .into_service()
148                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
149        );
150
151    // Add reflection service
152    let reflection_service = tonic_reflection::server::Builder::configure()
153        .register_encoded_file_descriptor_set(STORAGE_FILE_DESCRIPTOR_SET)
154        .register_encoded_file_descriptor_set(BUILD_FILE_DESCRIPTOR_SET)
155        .register_encoded_file_descriptor_set(ARTIFACT_REGISTRY_FILE_DESCRIPTOR_SET)
156        .register_encoded_file_descriptor_set(VAULT_FILE_DESCRIPTOR_SET)
157        .register_encoded_file_descriptor_set(KV_FILE_DESCRIPTOR_SET)
158        .register_encoded_file_descriptor_set(QUEUE_FILE_DESCRIPTOR_SET)
159        .register_encoded_file_descriptor_set(FUNCTION_FILE_DESCRIPTOR_SET)
160        .register_encoded_file_descriptor_set(CONTAINER_FILE_DESCRIPTOR_SET)
161        .register_encoded_file_descriptor_set(SERVICE_ACCOUNT_FILE_DESCRIPTOR_SET)
162        .register_encoded_file_descriptor_set(WAIT_UNTIL_FILE_DESCRIPTOR_SET)
163        .register_encoded_file_descriptor_set(CONTROL_FILE_DESCRIPTOR_SET)
164        .build_v1()
165        .into_alien_error()
166        .context(ErrorData::GrpcServiceUnavailable {
167            service: "reflection".to_string(),
168            endpoint: addr.to_string(),
169            reason: "Failed to build reflection service".to_string(),
170        })?;
171
172    router = router.add_service(reflection_service);
173
174    // Create a oneshot channel to signal when the server is ready
175    let (readiness_sender, readiness_receiver) = oneshot::channel();
176
177    let server_task = tokio::spawn(async move {
178        // Bind to the address first to ensure it's available
179        let listener = tokio::net::TcpListener::bind(addr)
180            .await
181            .into_alien_error()
182            .context(ErrorData::ServerBindFailed {
183                address: addr.to_string(),
184                reason: "Failed to bind to address".to_string(),
185            })?;
186
187        info!("gRPC server successfully bound to {}", addr);
188
189        // Signal that the server is ready to accept connections
190        if let Err(_) = readiness_sender.send(()) {
191            // The receiver was dropped, but we can still continue
192            info!("Readiness receiver was dropped, continuing with server startup");
193        }
194
195        // Convert the TcpListener to a stream of incoming connections
196        let incoming = TcpListenerStream::new(listener);
197
198        info!("gRPC server is now serving requests on {}", addr);
199
200        // Serve with the incoming connection stream
201        router
202            .serve_with_incoming(incoming)
203            .await
204            .into_alien_error()
205            .context(ErrorData::GrpcServiceUnavailable {
206                service: "main".to_string(),
207                endpoint: addr.to_string(),
208                reason: "gRPC server failed to serve".to_string(),
209            })
210    });
211
212    Ok(GrpcServerHandles {
213        wait_until_server: wait_until_server_handle,
214        control_server: control_server_handle,
215        server_task,
216        readiness_receiver,
217    })
218}