Skip to main content

amaters_server/
service.rs

1//! Network service module
2//!
3//! This module integrates the AQL service with the server runtime.
4
5use crate::config::ServerConfig;
6use crate::health::HealthChecker;
7use crate::metrics::MetricsCollector;
8use crate::server::{ServerError, ServerResult, Storage};
9use crate::shutdown::ShutdownCoordinator;
10use crate::tls_config::TlsServerBuilder;
11use amaters_net::grpc_service::AqlGrpcService;
12use amaters_net::{AqlServerBuilder, AqlServiceImpl};
13use std::net::SocketAddr;
14use std::sync::Arc;
15use tokio::sync::broadcast;
16use tokio::task::JoinHandle;
17use tracing::{error, info};
18
19/// Network service manager
20///
21/// Manages the AQL gRPC service
22pub struct NetworkService {
23    /// AQL service implementation
24    service: Arc<AqlServiceImpl<Storage>>,
25    /// Server configuration
26    config: Arc<ServerConfig>,
27    /// Health checker
28    health: HealthChecker,
29    /// Metrics collector
30    metrics: MetricsCollector,
31    /// Shutdown receiver
32    shutdown: ShutdownCoordinator,
33    /// Server task handle
34    server_handle: Option<JoinHandle<Result<(), tonic::transport::Error>>>,
35}
36
37impl NetworkService {
38    /// Create a new network service
39    pub fn new(
40        storage: Arc<Storage>,
41        config: Arc<ServerConfig>,
42        health: HealthChecker,
43        metrics: MetricsCollector,
44        shutdown: ShutdownCoordinator,
45    ) -> Self {
46        let service = Arc::new(AqlServerBuilder::new(storage).build());
47
48        Self {
49            service,
50            config,
51            health,
52            metrics,
53            shutdown,
54            server_handle: None,
55        }
56    }
57
58    /// Start the network service
59    pub async fn start(&mut self) -> ServerResult<()> {
60        let addr: SocketAddr = self
61            .config
62            .server
63            .bind_address
64            .parse()
65            .map_err(|e| ServerError::Config(format!("Invalid bind address: {}", e)))?;
66
67        info!("Starting AQL gRPC service on {}", addr);
68
69        // Build TLS configuration if enabled
70        let tls_config = TlsServerBuilder::build(&self.config)?;
71
72        if tls_config.is_some() {
73            info!("TLS enabled for gRPC server");
74        } else {
75            info!("TLS not enabled (development mode only)");
76        }
77
78        // Create gRPC service bridge
79        let grpc_service = AqlGrpcService::new(self.service.clone());
80
81        // Build gRPC server
82        use amaters_net::proto::aql::aql_service_server::AqlServiceServer;
83
84        let mut server_builder = tonic::transport::Server::builder();
85
86        // Add TLS if configured
87        if let Some(tls) = tls_config {
88            server_builder = server_builder
89                .tls_config(tls)
90                .map_err(|e| ServerError::TlsSetup(format!("Failed to configure TLS: {}", e)))?;
91        }
92
93        let server = server_builder.add_service(AqlServiceServer::new(grpc_service));
94
95        // Setup graceful shutdown
96        let mut shutdown_rx = self.shutdown.subscribe();
97
98        // Spawn server task
99        let handle = tokio::spawn(async move {
100            server
101                .serve_with_shutdown(addr, async {
102                    shutdown_rx.recv().await.ok();
103                    info!("Received shutdown signal, stopping gRPC server");
104                })
105                .await
106        });
107
108        self.server_handle = Some(handle);
109
110        info!("AQL gRPC service started successfully on {}", addr);
111        Ok(())
112    }
113
114    /// Stop the network service
115    pub async fn stop(&mut self) -> ServerResult<()> {
116        info!("Stopping network service");
117
118        if let Some(handle) = self.server_handle.take() {
119            // The server will shutdown gracefully via the shutdown signal
120            // Just wait for it to complete
121            match handle.await {
122                Ok(result) => {
123                    if let Err(e) = result {
124                        error!("gRPC server stopped with error: {}", e);
125                        return Err(ServerError::Network(format!("gRPC server error: {}", e)));
126                    }
127                }
128                Err(e) => {
129                    error!("Failed to join server task: {}", e);
130                    return Err(ServerError::Network(format!("Join error: {}", e)));
131                }
132            }
133            info!("Network service stopped");
134        }
135
136        Ok(())
137    }
138
139    /// Get reference to the AQL service
140    pub fn service(&self) -> &Arc<AqlServiceImpl<Storage>> {
141        &self.service
142    }
143}
144
145impl Drop for NetworkService {
146    fn drop(&mut self) {
147        if let Some(handle) = self.server_handle.take() {
148            handle.abort();
149        }
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::config::ServerConfig;
157    use amaters_core::storage::MemoryStorage;
158
159    #[tokio::test]
160    async fn test_network_service_creation() {
161        let storage = Arc::new(Storage::Memory(MemoryStorage::new()));
162        let config = Arc::new(ServerConfig::default());
163        let health = HealthChecker::new();
164        let metrics = MetricsCollector::new();
165        let shutdown = ShutdownCoordinator::new();
166
167        let service = NetworkService::new(storage, config, health, metrics, shutdown);
168        assert!(service.server_handle.is_none());
169    }
170
171    #[tokio::test]
172    async fn test_network_service_start_stop() {
173        let storage = Arc::new(Storage::Memory(MemoryStorage::new()));
174        let mut config = ServerConfig::default();
175        config.server.bind_address = "127.0.0.1:18787".to_string();
176        let config = Arc::new(config);
177        let health = HealthChecker::new();
178        let metrics = MetricsCollector::new();
179        let shutdown = ShutdownCoordinator::new();
180
181        let mut service = NetworkService::new(storage, config, health, metrics, shutdown.clone());
182
183        // Start the service
184        let result = service.start().await;
185        assert!(result.is_ok());
186
187        // Give it a moment to start
188        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
189
190        // Trigger shutdown
191        shutdown.shutdown();
192
193        // Stop the service
194        let stop_result = service.stop().await;
195        assert!(stop_result.is_ok());
196    }
197}