wick-component-cli 0.3.0

Simple implementation of a wick CLI interface for Rust components.
Documentation
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;

use tokio::sync::mpsc::Sender;
use tonic::transport::{Certificate, Identity, Server};
use tracing::{debug, error, info, trace};
use wick_config::FetchOptions;
use wick_invocation_server::InvocationServer;
use wick_rpc::rpc::invocation_service_server::InvocationServiceServer;

use super::{Result, ServerMessage};
use crate::options::ServerOptions;

pub(super) async fn start_rpc_server(
  options: &ServerOptions,
  svc: InvocationServiceServer<InvocationServer>,
) -> Result<(SocketAddr, Sender<ServerMessage>)> {
  debug!("initializing RPC server");
  let port = options.port.unwrap_or(0);
  let address = options.address.unwrap_or(Ipv4Addr::from_str("127.0.0.1")?);

  let socket = tokio::net::TcpSocket::new_v4()?;
  socket.bind(SocketAddr::new(IpAddr::V4(address), port))?;
  let addr = socket.local_addr()?;

  trace!(
    address = %addr,
    port = addr.port(),
    "RPC server address"
  );

  socket.set_reuseaddr(true).unwrap();
  #[cfg(not(target_os = "windows"))]
  socket.set_reuseport(true).unwrap();
  let listener = socket.listen(512).unwrap();

  let stream = tokio_stream::wrappers::TcpListenerStream::new(listener);

  #[cfg(feature = "reflection")]
  let reflection = tonic_reflection::server::Builder::configure()
    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
    .build()
    .unwrap();

  let mut builder = Server::builder();

  let fetch_opts = FetchOptions::default();

  if let (Some(pem), Some(key)) = (&options.pem, &options.key) {
    let server_pem = pem.bytes(&fetch_opts).await?;
    let server_key = key.bytes(&fetch_opts).await?;
    let identity = Identity::from_pem(&*server_pem, &*server_key);
    let mut tls = tonic::transport::ServerTlsConfig::new().identity(identity);

    if let Some(ca) = &options.ca {
      debug!(ca = %ca, "RPC: Adding CA root");
      let ca_pem = ca.bytes(&fetch_opts).await?;
      let ca = Certificate::from_pem(&*ca_pem);
      tls = tls.client_ca_root(ca);
    }

    builder = builder.tls_config(tls)?;
  } else if let Some(ca) = &options.ca {
    debug!(ca = %ca,"RPC: Adding CA root");
    let ca_pem = ca.bytes(&fetch_opts).await?;
    let ca = Certificate::from_pem(&*ca_pem);
    let tls = tonic::transport::ServerTlsConfig::new().client_ca_root(ca);
    builder = builder.tls_config(tls)?;
  }

  #[cfg(feature = "reflection")]
  let builder = builder.add_service(svc.clone()).add_service(reflection);
  #[cfg(not(feature = "reflection"))]
  let builder = builder.add_service(svc);

  let (tx, mut rx) = tokio::sync::mpsc::channel::<ServerMessage>(1);
  let server = builder.serve_with_incoming_shutdown(stream, async move {
    rx.recv().await;
    info!("received RPC shutdown message.");
  });

  tokio::spawn(async move {
    info!("starting RPC server");
    if let Err(e) = server.await {
      error!("error running RPC server: {}", e);
    };
    info!("RPC server shut down");
  });
  Ok((addr, tx))
}