spatio_server/
lib.rs

1use futures::SinkExt;
2use spatio::Spatio;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::net::{TcpListener, TcpStream};
6use tokio_stream::StreamExt;
7use tokio_util::codec::Framed;
8use tracing::{debug, error, info};
9
10pub mod handler;
11
12use crate::handler::Handler;
13pub use spatio_rpc as rpc;
14pub use spatio_rpc::{RpcClientCodec, RpcServerCodec};
15use std::future::Future;
16use tokio::time::{Duration, timeout};
17
18const CONN_TIMEOUT: Duration = Duration::from_secs(30);
19const IDLE_TIMEOUT: Duration = Duration::from_secs(300);
20
21pub struct AppState {
22    pub handler: Arc<Handler>,
23}
24
25pub async fn run_server(
26    addr: SocketAddr,
27    db: Arc<Spatio>,
28    mut shutdown: impl Future<Output = ()> + Unpin + Send + 'static,
29) -> anyhow::Result<()> {
30    let state = Arc::new(AppState {
31        handler: Arc::new(crate::handler::Handler::new(db)),
32    });
33
34    let listener = TcpListener::bind(addr).await?;
35    info!("Spatio RPC Server listening on {}", addr);
36
37    loop {
38        tokio::select! {
39            accept_res = listener.accept() => {
40                match accept_res {
41                    Ok((socket, _)) => {
42                        let state = state.clone();
43                        tokio::spawn(async move {
44                            if let Err(e) = handle_connection(socket, state).await {
45                                debug!("Connection closed: {}", e);
46                            }
47                        });
48                    }
49                    Err(e) => {
50                        error!("Accept error: {}", e);
51                    }
52                }
53            }
54            _ = &mut shutdown => {
55                info!("Shutdown signal received, stopping server...");
56                break;
57            }
58        }
59    }
60
61    Ok(())
62}
63
64pub async fn handle_connection(socket: TcpStream, state: Arc<AppState>) -> anyhow::Result<()> {
65    let mut framed = Framed::new(socket, RpcServerCodec);
66
67    while let Ok(Some(request)) = timeout(IDLE_TIMEOUT, framed.next()).await {
68        match request {
69            Ok(cmd) => {
70                debug!("Received command: {:?}", cmd);
71                let response = state.handler.handle(cmd).await;
72                timeout(CONN_TIMEOUT, framed.send(response)).await??;
73            }
74            Err(e) => {
75                error!("Failed to decode frame: {}", e);
76                return Err(e);
77            }
78        }
79    }
80
81    Ok(())
82}