use std::error;
use std::future::Future;
use std::net::SocketAddr;
#[cfg(unix)]
use std::path::Path;
use std::pin::Pin;
use serde::de::DeserializeOwned;
use tonic::transport::Server;
use crate::server::services::GrpcSimulationService;
use crate::simulation::SimInit;
use super::codegen::simulation::simulation_server;
pub fn run<F, I>(sim_gen: F, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> Result<SimInit, Box<dyn error::Error>> + Send + 'static,
I: DeserializeOwned,
{
run_service(GrpcSimulationService::new(sim_gen), addr, None)
}
pub fn run_with_shutdown<F, I, S>(
sim_gen: F,
addr: SocketAddr,
signal: S,
) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> Result<SimInit, Box<dyn error::Error>> + Send + 'static,
I: DeserializeOwned,
for<'a> S: Future<Output = ()> + 'a,
{
run_service(
GrpcSimulationService::new(sim_gen),
addr,
Some(Box::pin(signal)),
)
}
fn run_service(
service: GrpcSimulationService,
addr: SocketAddr,
signal: Option<Pin<Box<dyn Future<Output = ()>>>>,
) -> Result<(), Box<dyn std::error::Error>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;
rt.block_on(async move {
let service =
Server::builder().add_service(simulation_server::SimulationServer::new(service));
match signal {
Some(signal) => service.serve_with_shutdown(addr, signal).await?,
None => service.serve(addr).await?,
};
Ok(())
})
}
#[cfg(unix)]
pub fn run_local<F, I, P>(sim_gen: F, path: P) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> Result<SimInit, Box<dyn error::Error>> + Send + 'static,
I: DeserializeOwned,
P: AsRef<Path>,
{
let path = path.as_ref();
run_local_service(GrpcSimulationService::new(sim_gen), path, None)
}
#[cfg(unix)]
pub fn run_local_with_shutdown<F, I, P, S>(
sim_gen: F,
path: P,
signal: S,
) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(I) -> Result<SimInit, Box<dyn error::Error>> + Send + 'static,
I: DeserializeOwned,
P: AsRef<Path>,
for<'a> S: Future<Output = ()> + 'a,
{
let path = path.as_ref();
run_local_service(
GrpcSimulationService::new(sim_gen),
path,
Some(Box::pin(signal)),
)
}
#[cfg(unix)]
fn run_local_service(
service: GrpcSimulationService,
path: &Path,
signal: Option<Pin<Box<dyn Future<Output = ()>>>>,
) -> Result<(), Box<dyn std::error::Error>> {
use std::fs;
use std::io;
use std::os::unix::fs::FileTypeExt;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
match fs::metadata(path) {
Ok(socket_meta) => {
if !socket_meta.file_type().is_socket() {
return Err(Box::new(io::Error::new(
io::ErrorKind::AlreadyExists,
"the specified path points to an existing non-socket file",
)));
}
fs::remove_file(path)?;
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(Box::new(e)),
}
fs::create_dir_all(path.parent().unwrap())?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;
rt.block_on(async move {
let uds = UnixListener::bind(path)?;
let uds_stream = UnixListenerStream::new(uds);
let service =
Server::builder().add_service(simulation_server::SimulationServer::new(service));
match signal {
Some(signal) => {
service
.serve_with_incoming_shutdown(uds_stream, signal)
.await?
}
None => service.serve_with_incoming(uds_stream).await?,
};
Ok(())
})
}