amaters_server/
service.rs1use crate::config::ServerConfig;
6use crate::health::HealthChecker;
7use crate::metrics::MetricsCollector;
8use crate::server::{ServerError, ServerResult, Storage};
9use crate::shutdown::ShutdownCoordinator;
10use crate::tls_config::TlsServerBuilder;
11use amaters_net::grpc_service::AqlGrpcService;
12use amaters_net::{AqlServerBuilder, AqlServiceImpl};
13use std::net::SocketAddr;
14use std::sync::Arc;
15use tokio::sync::broadcast;
16use tokio::task::JoinHandle;
17use tracing::{error, info};
18
19pub struct NetworkService {
23 service: Arc<AqlServiceImpl<Storage>>,
25 config: Arc<ServerConfig>,
27 health: HealthChecker,
29 metrics: MetricsCollector,
31 shutdown: ShutdownCoordinator,
33 server_handle: Option<JoinHandle<Result<(), tonic::transport::Error>>>,
35}
36
37impl NetworkService {
38 pub fn new(
40 storage: Arc<Storage>,
41 config: Arc<ServerConfig>,
42 health: HealthChecker,
43 metrics: MetricsCollector,
44 shutdown: ShutdownCoordinator,
45 ) -> Self {
46 let service = Arc::new(AqlServerBuilder::new(storage).build());
47
48 Self {
49 service,
50 config,
51 health,
52 metrics,
53 shutdown,
54 server_handle: None,
55 }
56 }
57
58 pub async fn start(&mut self) -> ServerResult<()> {
60 let addr: SocketAddr = self
61 .config
62 .server
63 .bind_address
64 .parse()
65 .map_err(|e| ServerError::Config(format!("Invalid bind address: {}", e)))?;
66
67 info!("Starting AQL gRPC service on {}", addr);
68
69 let tls_config = TlsServerBuilder::build(&self.config)?;
71
72 if tls_config.is_some() {
73 info!("TLS enabled for gRPC server");
74 } else {
75 info!("TLS not enabled (development mode only)");
76 }
77
78 let grpc_service = AqlGrpcService::new(self.service.clone());
80
81 use amaters_net::proto::aql::aql_service_server::AqlServiceServer;
83
84 let mut server_builder = tonic::transport::Server::builder();
85
86 if let Some(tls) = tls_config {
88 server_builder = server_builder
89 .tls_config(tls)
90 .map_err(|e| ServerError::TlsSetup(format!("Failed to configure TLS: {}", e)))?;
91 }
92
93 let server = server_builder.add_service(AqlServiceServer::new(grpc_service));
94
95 let mut shutdown_rx = self.shutdown.subscribe();
97
98 let handle = tokio::spawn(async move {
100 server
101 .serve_with_shutdown(addr, async {
102 shutdown_rx.recv().await.ok();
103 info!("Received shutdown signal, stopping gRPC server");
104 })
105 .await
106 });
107
108 self.server_handle = Some(handle);
109
110 info!("AQL gRPC service started successfully on {}", addr);
111 Ok(())
112 }
113
114 pub async fn stop(&mut self) -> ServerResult<()> {
116 info!("Stopping network service");
117
118 if let Some(handle) = self.server_handle.take() {
119 match handle.await {
122 Ok(result) => {
123 if let Err(e) = result {
124 error!("gRPC server stopped with error: {}", e);
125 return Err(ServerError::Network(format!("gRPC server error: {}", e)));
126 }
127 }
128 Err(e) => {
129 error!("Failed to join server task: {}", e);
130 return Err(ServerError::Network(format!("Join error: {}", e)));
131 }
132 }
133 info!("Network service stopped");
134 }
135
136 Ok(())
137 }
138
139 pub fn service(&self) -> &Arc<AqlServiceImpl<Storage>> {
141 &self.service
142 }
143}
144
145impl Drop for NetworkService {
146 fn drop(&mut self) {
147 if let Some(handle) = self.server_handle.take() {
148 handle.abort();
149 }
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::config::ServerConfig;
157 use amaters_core::storage::MemoryStorage;
158
159 #[tokio::test]
160 async fn test_network_service_creation() {
161 let storage = Arc::new(Storage::Memory(MemoryStorage::new()));
162 let config = Arc::new(ServerConfig::default());
163 let health = HealthChecker::new();
164 let metrics = MetricsCollector::new();
165 let shutdown = ShutdownCoordinator::new();
166
167 let service = NetworkService::new(storage, config, health, metrics, shutdown);
168 assert!(service.server_handle.is_none());
169 }
170
171 #[tokio::test]
172 async fn test_network_service_start_stop() {
173 let storage = Arc::new(Storage::Memory(MemoryStorage::new()));
174 let mut config = ServerConfig::default();
175 config.server.bind_address = "127.0.0.1:18787".to_string();
176 let config = Arc::new(config);
177 let health = HealthChecker::new();
178 let metrics = MetricsCollector::new();
179 let shutdown = ShutdownCoordinator::new();
180
181 let mut service = NetworkService::new(storage, config, health, metrics, shutdown.clone());
182
183 let result = service.start().await;
185 assert!(result.is_ok());
186
187 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
189
190 shutdown.shutdown();
192
193 let stop_result = service.stop().await;
195 assert!(stop_result.is_ok());
196 }
197}