1use futures::SinkExt;
2use spatio::Spatio;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::net::{TcpListener, TcpStream};
6use tokio_stream::StreamExt;
7use tokio_util::codec::Framed;
8use tracing::{debug, error, info};
9
10pub mod handler;
11
12use crate::handler::Handler;
13pub use spatio_rpc as rpc;
14pub use spatio_rpc::{RpcClientCodec, RpcServerCodec};
15use std::future::Future;
16use tokio::time::{Duration, timeout};
17
18const CONN_TIMEOUT: Duration = Duration::from_secs(30);
19const IDLE_TIMEOUT: Duration = Duration::from_secs(300);
20
21pub struct AppState {
22 pub handler: Arc<Handler>,
23}
24
25pub async fn run_server(
26 addr: SocketAddr,
27 db: Arc<Spatio>,
28 mut shutdown: impl Future<Output = ()> + Unpin + Send + 'static,
29) -> anyhow::Result<()> {
30 let state = Arc::new(AppState {
31 handler: Arc::new(crate::handler::Handler::new(db)),
32 });
33
34 let listener = TcpListener::bind(addr).await?;
35 info!("Spatio RPC Server listening on {}", addr);
36
37 loop {
38 tokio::select! {
39 accept_res = listener.accept() => {
40 match accept_res {
41 Ok((socket, _)) => {
42 let state = state.clone();
43 tokio::spawn(async move {
44 if let Err(e) = handle_connection(socket, state).await {
45 debug!("Connection closed: {}", e);
46 }
47 });
48 }
49 Err(e) => {
50 error!("Accept error: {}", e);
51 }
52 }
53 }
54 _ = &mut shutdown => {
55 info!("Shutdown signal received, stopping server...");
56 break;
57 }
58 }
59 }
60
61 Ok(())
62}
63
64pub async fn handle_connection(socket: TcpStream, state: Arc<AppState>) -> anyhow::Result<()> {
65 let mut framed = Framed::new(socket, RpcServerCodec);
66
67 while let Ok(Some(request)) = timeout(IDLE_TIMEOUT, framed.next()).await {
68 match request {
69 Ok(cmd) => {
70 debug!("Received command: {:?}", cmd);
71 let response = state.handler.handle(cmd).await;
72 timeout(CONN_TIMEOUT, framed.send(response)).await??;
73 }
74 Err(e) => {
75 error!("Failed to decode frame: {}", e);
76 return Err(e);
77 }
78 }
79 }
80
81 Ok(())
82}